侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

libdvbpsi源码分析(二)main函数

2022-06-28 星期二 / 0 评论 / 0 点赞 / 69 阅读 / 11870 字

从demon的dvbinfo.c中的main函数入口分析: 为了分析方便,此处将宏HAVE_SYS_SOCKET_H隔离的socket代码去掉,只关注libdvbpsi本身的实现。 1.数据结构

        从demon的dvbinfo.c中的main函数入口分析:
        为了分析方便,此处将宏HAVE_SYS_SOCKET_H隔离的socket代码去掉,只关注libdvbpsi本身的实现。

    1.数据结构的设计:
1.1、捕获器capture的数据结构设计如下:

typedef struct dvbinfo_capture_s{    fifo_t  *fifo;    fifo_t  *empty;    pthread_mutex_t lock;    pthread_cond_t  fifo_full;    bool     b_fifo_full;    size_t   size;  /* prefered capture size */    params_t *params;    bool      b_alive;} dvbinfo_capture_t;
由数据结构可知,由于demo是本地的autotest,所以用有名管道fifo作测试。从命令行终端读入ts流文件,一个
线程不断将buffer中的数据push进入fifo,同时主线程不断从fifo pop出数据并解析,直到从fifo中读出的数据为0

1.2、dvbinfo_capture_t中的params_t 结构体
typedef struct params_s{    /* parameters */    char *output;    char *input;    int  port;    char *mcast_interface;    bool b_udp;    bool b_tcp;    bool b_file;    /* */    int  fd_in;    int  fd_out;    int  debug;    bool b_verbose;    bool b_monitor; /* run in daemon mode */    /* statistics */    bool b_summary; /* write summary */    struct summary_s {        int mode;       /* one of: i_summary_mode */        int64_t period; /* summary period in ms */        char *file;     /* summary file name    */        FILE *fd;       /* summary file descriptor */    } summary;    /* read data from file of socket */    ssize_t (*pf_read)(int fd, void *buf, size_t count);    ssize_t (*pf_write)(int fd, const void *buf, size_t count);} params_t;

2.main函数入口
int main(int argc, char **pp_argv){    dvbinfo_capture_t capture;    params_t *param = NULL;    char c;       if (argc == 1)        usage();    param = params_init();    capture.params = param;    capture.fifo = fifo_new();    capture.empty = fifo_new();    capture.b_fifo_full = false;    pthread_mutex_init(&capture.lock, NULL);		//锁的初始化    pthread_cond_init(&capture.fifo_full, NULL);	//条件变量初始化    static const struct option long_options[] =    {        { "debug",     required_argument, NULL, 'd' },        { "help",      no_argument,       NULL, 'h' },        /* - inputs - */        { "file",      required_argument, NULL, 'f' },        { NULL, 0, NULL, 0 }    };	/*	解析命令行参数	*/    while ((c = getopt_long(argc, pp_argv, "d:f:h", long_options, NULL)) != -1)    {        switch(c)        {            case 'd':                if (optarg)                {                    param->debug = 0;                    if (strncmp(optarg, "error", 5) == 0)                        param->debug = 1;                    else if (strncmp(optarg, "warn", 4) == 0)                        param->debug = 2;                    else if (strncmp(optarg, "debug", 5) == 0)                        param->debug = 3;                }                break;            case 'f':                if (optarg)                {					/*读入ts流文件,注意asprintf的用法*/                    if (asprintf(&param->input, "%s", optarg) < 0)                    {                        fprintf(stderr, "error: out of memory/n");                        params_free(param);                        usage();                    }                    /* */                    param->pf_read = read;                    param->b_file = true;                }                break;            case ':':                fprintf(stderr, "Option %c is missing arguments/n", c);                params_free(param);                exit(EXIT_FAILURE);                break;            case '?':               fprintf(stderr, "Unknown option %c found/n", c);               params_free(param);               exit(EXIT_FAILURE);               break;            case 'h':            default:                params_free(param);                usage();                break;        }    };    if (param->input == NULL)    {        libdvbpsi_log(param, DVBINFO_LOG_ERROR, "No source given/n");        params_free(param);        usage(); /* exits application */    }    {        capture.size = 188;	//ts packet的大小固定为188字节        libdvbpsi_log(param, DVBINFO_LOG_INFO, "Examining: %s/n",                      param->input);    }    /* Capture thread */    dvbinfo_open(param);	//打开ts流文件,获取文件描述符    pthread_t handle;		//线程pid    capture.b_alive = true;	/*创建capture的线程*/    if (pthread_create(&handle, NULL, dvbinfo_capture, (void *)&capture) < 0)    {        libdvbpsi_log(param, DVBINFO_LOG_ERROR, "failed creating thread/n");        dvbinfo_close(param);		        params_free(param);        exit(EXIT_FAILURE);    }	/*从fifo中不断读数据,同时解析PSI表*/    int err = dvbinfo_process(&capture);	/*如果fifo中读取数据为0,表示ts流文件解析完毕。将cpature的alive标志位置为false,并终止线程*/    capture.b_alive = false;     /* stop thread */    if (pthread_join(handle, NULL) < 0)        libdvbpsi_log(param, DVBINFO_LOG_ERROR, "error joining capture thread/n");    dvbinfo_close(param);    /* cleanup */    fifo_wake((&capture)->fifo);    fifo_wake((&capture)->empty);    fifo_free((&capture)->fifo);    fifo_free((&capture)->empty);    pthread_mutex_destroy(&capture.lock);    pthread_cond_destroy(&capture.fifo_full);    params_free(param);    if (err < 0)        exit(EXIT_FAILURE);    else        exit(EXIT_SUCCESS);}
3.线程实例(routine)dvbinfo_capture的分析
【注解】线程dvbinfo_capture不断地将ts流文件push进fifo, 同时并发地,dvbinfo_process不断地从fifo中取出数据并提供给每个specific decoder进行section的解析,完成table表的重建。直到线程dvbinfo_capture将ts流文件读取完毕,并且dvbinfo_process处理完成后,才将capture的活动标志位capture.b_alive置为false,最后终止线程并释放线程资源。典型的“生产者——消费者”模式。

static void *dvbinfo_capture(void *data){    dvbinfo_capture_t *capture = (dvbinfo_capture_t *)data;    const params_t *param = capture->params;    bool b_eof = false;    while (capture->b_alive && !b_eof)    {        buffer_t *buffer;        if (fifo_count(capture->empty) == 0)            buffer = buffer_new(capture->size);        else            buffer = fifo_pop(capture->empty);        if (buffer == NULL) /* out of memory */            break;        ssize_t size = param->pf_read(param->fd_in, buffer->p_data, buffer->i_size);        if (size < 0) /* short read ? */        {            fifo_push(capture->empty, buffer);            continue;        }        else if (size == 0)        {            fifo_push(capture->empty, buffer);            b_eof = true;            continue;        }        buffer->i_date = mdate();        /* check fifo size */        if (fifo_size(capture->fifo) >= FIFO_THRESHOLD_SIZE)        {            pthread_mutex_lock(&capture->lock);            capture->b_fifo_full = true;            pthread_mutex_unlock(&capture->lock);            if (param->b_file)            {                /* wait till buffer becomes smaller again */                pthread_mutex_lock(&capture->lock);                while(capture->b_fifo_full)                    pthread_cond_wait(&capture->fifo_full, &capture->lock);                pthread_mutex_unlock(&capture->lock);            }            else            {                libdvbpsi_log(capture->params, DVBINFO_LOG_ERROR,                          "error fifo full discarding buffer");                fifo_push(capture->empty, buffer);                continue;            }        }        /* store buffer */        fifo_push(capture->fifo, buffer);        buffer = NULL;    }    capture->b_alive = false;    fifo_wake(capture->fifo);    return NULL;}
4、dvbinfo_process负责ts流中各种table的解析
static int dvbinfo_process(dvbinfo_capture_t *capture){    int err = -1;    bool b_error = false;    params_t *param = capture->params;    buffer_t *buffer = NULL;    char *psz_temp = NULL;    mtime_t deadline = 0;    if (param->b_summary)    {        if (asprintf(&psz_temp, "%s.part", param->summary.file) < 0)        {            libdvbpsi_log(param, DVBINFO_LOG_ERROR, "Could not create temporary summary file %s/n",                          param->summary.file);            return err;        }        deadline = mdate() + param->summary.period;    }	/*MPEG-TS PSI decoders create*/    ts_stream_t *stream = libdvbpsi_init(param->debug, &libdvbpsi_log, (void *)param);    if (!stream)        goto out;    while (!b_error)    {        /* Wait till fifo has emptied */        if (!capture->b_alive && (fifo_count(capture->fifo) == 0))            break;        /* Wait for data to arrive */        buffer = fifo_pop(capture->fifo);        if (buffer == NULL)            continue;        if (param->output)        {            size_t size = param->pf_write(param->fd_out, buffer->p_data, buffer->i_size);            if (size < 0) /* error writing */            {                libdvbpsi_log(param, DVBINFO_LOG_ERROR,                              "error (%d) writting to %s", errno, param->output);                break;            }            else if (size < buffer->i_size) /* short writting disk full? */            {                libdvbpsi_log(param, DVBINFO_LOG_ERROR,                              "error writting to %s (disk full?)", param->output);                break;            }        }        if (!libdvbpsi_process(stream, buffer->p_data, buffer->i_size, buffer->i_date))            b_error = true;        /* summary statistics */        if (param->b_summary)        {            if (mdate() >= deadline)            {                FILE *fd = fopen(psz_temp, "w+");                if (fd)                {                    libdvbpsi_summary(fd, stream, param->summary.mode);                    fflush(fd);                    fclose(fd);                    unlink(param->summary.file);                    rename(psz_temp, param->summary.file);                }                else                {                    libdvbpsi_log(param, DVBINFO_LOG_ERROR,                                  "failed opening summary file (disabling summary logging)/n");                    param->b_summary = false;                }                deadline = mdate() + param->summary.period;            }        }        /* reuse buffer */        fifo_push(capture->empty, buffer);        buffer = NULL;        /* check fifo size */        if (fifo_size(capture->fifo) < FIFO_THRESHOLD_SIZE)        {            pthread_mutex_lock(&capture->lock);            capture->b_fifo_full = false;            pthread_cond_signal(&capture->fifo_full);            pthread_mutex_unlock(&capture->lock);        }    }    assert(fifo_count(capture->fifo) == 0);    libdvbpsi_exit(stream);    err = 0;out:    if (b_error)        libdvbpsi_log(param, DVBINFO_LOG_ERROR, "error while processing/n" );    if (buffer) buffer_free(buffer);    free(psz_temp);    return err;}


广告 广告

评论区