从demon的dvbinfo.c中的main函数入口分析: 为了分析方便,此处将宏HAVE_SYS_SOCKET_H隔离的socket代码去掉,只关注libdvbpsi本身的实现。 1.数据结构
从demon的dvbinfo.c中的main函数入口分析:
为了分析方便,此处将宏HAVE_SYS_SOCKET_H隔离的socket代码去掉,只关注libdvbpsi本身的实现。
1.数据结构的设计:
1.1、捕获器capture的数据结构设计如下:
由数据结构可知,由于demo是本地的autotest,所以用有名管道fifo作测试。从命令行终端读入ts流文件,一个typedef struct dvbinfo_capture_s{ fifo_t *fifo; fifo_t *empty; pthread_mutex_t lock; pthread_cond_t fifo_full; bool b_fifo_full; size_t size; /* prefered capture size */ params_t *params; bool b_alive;} dvbinfo_capture_t;
线程不断将buffer中的数据push进入fifo,同时主线程不断从fifo pop出数据并解析,直到从fifo中读出的数据为0
1.2、dvbinfo_capture_t中的params_t 结构体
typedef struct params_s{ /* parameters */ char *output; char *input; int port; char *mcast_interface; bool b_udp; bool b_tcp; bool b_file; /* */ int fd_in; int fd_out; int debug; bool b_verbose; bool b_monitor; /* run in daemon mode */ /* statistics */ bool b_summary; /* write summary */ struct summary_s { int mode; /* one of: i_summary_mode */ int64_t period; /* summary period in ms */ char *file; /* summary file name */ FILE *fd; /* summary file descriptor */ } summary; /* read data from file of socket */ ssize_t (*pf_read)(int fd, void *buf, size_t count); ssize_t (*pf_write)(int fd, const void *buf, size_t count);} params_t;
2.main函数入口
3.线程实例(routine)dvbinfo_capture的分析int main(int argc, char **pp_argv){ dvbinfo_capture_t capture; params_t *param = NULL; char c; if (argc == 1) usage(); param = params_init(); capture.params = param; capture.fifo = fifo_new(); capture.empty = fifo_new(); capture.b_fifo_full = false; pthread_mutex_init(&capture.lock, NULL); //锁的初始化 pthread_cond_init(&capture.fifo_full, NULL); //条件变量初始化 static const struct option long_options[] = { { "debug", required_argument, NULL, 'd' }, { "help", no_argument, NULL, 'h' }, /* - inputs - */ { "file", required_argument, NULL, 'f' }, { NULL, 0, NULL, 0 } }; /* 解析命令行参数 */ while ((c = getopt_long(argc, pp_argv, "d:f:h", long_options, NULL)) != -1) { switch(c) { case 'd': if (optarg) { param->debug = 0; if (strncmp(optarg, "error", 5) == 0) param->debug = 1; else if (strncmp(optarg, "warn", 4) == 0) param->debug = 2; else if (strncmp(optarg, "debug", 5) == 0) param->debug = 3; } break; case 'f': if (optarg) { /*读入ts流文件,注意asprintf的用法*/ if (asprintf(¶m->input, "%s", optarg) < 0) { fprintf(stderr, "error: out of memory/n"); params_free(param); usage(); } /* */ param->pf_read = read; param->b_file = true; } break; case ':': fprintf(stderr, "Option %c is missing arguments/n", c); params_free(param); exit(EXIT_FAILURE); break; case '?': fprintf(stderr, "Unknown option %c found/n", c); params_free(param); exit(EXIT_FAILURE); break; case 'h': default: params_free(param); usage(); break; } }; if (param->input == NULL) { libdvbpsi_log(param, DVBINFO_LOG_ERROR, "No source given/n"); params_free(param); usage(); /* exits application */ } { capture.size = 188; //ts packet的大小固定为188字节 libdvbpsi_log(param, DVBINFO_LOG_INFO, "Examining: %s/n", param->input); } /* Capture thread */ dvbinfo_open(param); //打开ts流文件,获取文件描述符 pthread_t handle; //线程pid capture.b_alive = true; /*创建capture的线程*/ if (pthread_create(&handle, NULL, dvbinfo_capture, (void *)&capture) < 0) { libdvbpsi_log(param, DVBINFO_LOG_ERROR, "failed creating thread/n"); dvbinfo_close(param); params_free(param); exit(EXIT_FAILURE); } /*从fifo中不断读数据,同时解析PSI表*/ int err = dvbinfo_process(&capture); /*如果fifo中读取数据为0,表示ts流文件解析完毕。将cpature的alive标志位置为false,并终止线程*/ capture.b_alive = false; /* stop thread */ if (pthread_join(handle, NULL) < 0) libdvbpsi_log(param, DVBINFO_LOG_ERROR, "error joining capture thread/n"); dvbinfo_close(param); /* cleanup */ fifo_wake((&capture)->fifo); fifo_wake((&capture)->empty); fifo_free((&capture)->fifo); fifo_free((&capture)->empty); pthread_mutex_destroy(&capture.lock); pthread_cond_destroy(&capture.fifo_full); params_free(param); if (err < 0) exit(EXIT_FAILURE); else exit(EXIT_SUCCESS);}
【注解】线程dvbinfo_capture不断地将ts流文件push进fifo, 同时并发地,dvbinfo_process不断地从fifo中取出数据并提供给每个specific decoder进行section的解析,完成table表的重建。直到线程dvbinfo_capture将ts流文件读取完毕,并且dvbinfo_process处理完成后,才将capture的活动标志位capture.b_alive置为false,最后终止线程并释放线程资源。典型的“生产者——消费者”模式。
4、dvbinfo_process负责ts流中各种table的解析static void *dvbinfo_capture(void *data){ dvbinfo_capture_t *capture = (dvbinfo_capture_t *)data; const params_t *param = capture->params; bool b_eof = false; while (capture->b_alive && !b_eof) { buffer_t *buffer; if (fifo_count(capture->empty) == 0) buffer = buffer_new(capture->size); else buffer = fifo_pop(capture->empty); if (buffer == NULL) /* out of memory */ break; ssize_t size = param->pf_read(param->fd_in, buffer->p_data, buffer->i_size); if (size < 0) /* short read ? */ { fifo_push(capture->empty, buffer); continue; } else if (size == 0) { fifo_push(capture->empty, buffer); b_eof = true; continue; } buffer->i_date = mdate(); /* check fifo size */ if (fifo_size(capture->fifo) >= FIFO_THRESHOLD_SIZE) { pthread_mutex_lock(&capture->lock); capture->b_fifo_full = true; pthread_mutex_unlock(&capture->lock); if (param->b_file) { /* wait till buffer becomes smaller again */ pthread_mutex_lock(&capture->lock); while(capture->b_fifo_full) pthread_cond_wait(&capture->fifo_full, &capture->lock); pthread_mutex_unlock(&capture->lock); } else { libdvbpsi_log(capture->params, DVBINFO_LOG_ERROR, "error fifo full discarding buffer"); fifo_push(capture->empty, buffer); continue; } } /* store buffer */ fifo_push(capture->fifo, buffer); buffer = NULL; } capture->b_alive = false; fifo_wake(capture->fifo); return NULL;}
static int dvbinfo_process(dvbinfo_capture_t *capture){ int err = -1; bool b_error = false; params_t *param = capture->params; buffer_t *buffer = NULL; char *psz_temp = NULL; mtime_t deadline = 0; if (param->b_summary) { if (asprintf(&psz_temp, "%s.part", param->summary.file) < 0) { libdvbpsi_log(param, DVBINFO_LOG_ERROR, "Could not create temporary summary file %s/n", param->summary.file); return err; } deadline = mdate() + param->summary.period; } /*MPEG-TS PSI decoders create*/ ts_stream_t *stream = libdvbpsi_init(param->debug, &libdvbpsi_log, (void *)param); if (!stream) goto out; while (!b_error) { /* Wait till fifo has emptied */ if (!capture->b_alive && (fifo_count(capture->fifo) == 0)) break; /* Wait for data to arrive */ buffer = fifo_pop(capture->fifo); if (buffer == NULL) continue; if (param->output) { size_t size = param->pf_write(param->fd_out, buffer->p_data, buffer->i_size); if (size < 0) /* error writing */ { libdvbpsi_log(param, DVBINFO_LOG_ERROR, "error (%d) writting to %s", errno, param->output); break; } else if (size < buffer->i_size) /* short writting disk full? */ { libdvbpsi_log(param, DVBINFO_LOG_ERROR, "error writting to %s (disk full?)", param->output); break; } } if (!libdvbpsi_process(stream, buffer->p_data, buffer->i_size, buffer->i_date)) b_error = true; /* summary statistics */ if (param->b_summary) { if (mdate() >= deadline) { FILE *fd = fopen(psz_temp, "w+"); if (fd) { libdvbpsi_summary(fd, stream, param->summary.mode); fflush(fd); fclose(fd); unlink(param->summary.file); rename(psz_temp, param->summary.file); } else { libdvbpsi_log(param, DVBINFO_LOG_ERROR, "failed opening summary file (disabling summary logging)/n"); param->b_summary = false; } deadline = mdate() + param->summary.period; } } /* reuse buffer */ fifo_push(capture->empty, buffer); buffer = NULL; /* check fifo size */ if (fifo_size(capture->fifo) < FIFO_THRESHOLD_SIZE) { pthread_mutex_lock(&capture->lock); capture->b_fifo_full = false; pthread_cond_signal(&capture->fifo_full); pthread_mutex_unlock(&capture->lock); } } assert(fifo_count(capture->fifo) == 0); libdvbpsi_exit(stream); err = 0;out: if (b_error) libdvbpsi_log(param, DVBINFO_LOG_ERROR, "error while processing/n" ); if (buffer) buffer_free(buffer); free(psz_temp); return err;}