Lukas Fellechner <lukas.fellech...@gmx.net> 于2022年8月21日周日 05:54写道: > > Trying with inline PATCH since attached file was not showing up... > > --- > > From: Lukas Fellechner <lukas.fellech...@gmx.net> > Subject: [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization > > Initializing DASH streams is currently slow, because each individual stream > is opened and probed sequentially. With DASH streams often having somewhere > between 10-20 streams, this can easily take up to half a minute. This patch > adds an "init-threads" option, specifying the max number of threads to use. > Multiple worker threads are spun up to massively bring down init times. > --- > libavformat/dashdec.c | 421 +++++++++++++++++++++++++++++++++++++----- > 1 file changed, 375 insertions(+), 46 deletions(-) > > diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c > index 63bf7e96a5..69a6c2ba79 100644 > --- a/libavformat/dashdec.c > +++ b/libavformat/dashdec.c > @@ -24,6 +24,7 @@ > #include "libavutil/opt.h" > #include "libavutil/time.h" > #include "libavutil/parseutils.h" > +#include "libavutil/thread.h" > #include "internal.h" > #include "avio_internal.h" > #include "dash.h" > @@ -152,6 +153,8 @@ typedef struct DASHContext { > int max_url_size; > char *cenc_decryption_key; > > + int init_threads; > + > /* Flags for init section*/ > int is_init_section_common_video; > int is_init_section_common_audio; > @@ -1918,22 +1921,40 @@ fail: > return ret; > } > > -static int open_demux_for_component(AVFormatContext *s, struct > representation *pls) > +static int open_demux_for_component(AVFormatContext* s, struct > representation* pls) > +{ > + int ret = 0; > + > + ret = begin_open_demux_for_component(s, pls); > + if (ret < 0) > + return ret; > + > + ret = end_open_demux_for_component(s, pls); > + > + return ret; > +} > + > +static int begin_open_demux_for_component(AVFormatContext* s, struct > representation* pls) > { > int ret = 0; > - int i; > > pls->parent = s; > - pls->cur_seq_no = calc_cur_seg_no(s, pls); > + pls->cur_seq_no = calc_cur_seg_no(s, pls); > > if (!pls->last_seq_no) { > pls->last_seq_no = calc_max_seg_no(pls, s->priv_data); > } > > ret = reopen_demux_for_component(s, pls); > - if (ret < 0) { > - goto fail; > - } > + > + return ret; > +} > + > +static int end_open_demux_for_component(AVFormatContext* s, struct > representation* pls) > +{ > + int ret = 0; > + int i; > + > for (i = 0; i < pls->ctx->nb_streams; i++) { > AVStream *st = avformat_new_stream(s, NULL); > AVStream *ist = pls->ctx->streams[i]; > @@ -2015,6 +2036,131 @@ static void move_metadata(AVStream *st, const char > *key, char **value) > } > } > > +struct work_pool_data > +{ > + AVFormatContext* ctx; > + struct representation* pls; > + struct representation* common_pls; > + pthread_mutex_t* common_mutex; > + pthread_cond_t* common_condition; Should add #if HAVE_THREADS to check if the pthread supported. > + int is_common; > + int is_started; > + int result; > +}; > + > +struct thread_data > +{ > + pthread_t thread; > + pthread_mutex_t* mutex; > + struct work_pool_data* work_pool; > + int work_pool_size; > + int is_started; > +}; > + > +static void *worker_thread(void *ptr) > +{ > + int ret = 0; > + int i; > + struct thread_data* thread_data = (struct thread_data*)ptr; > + struct work_pool_data* work_pool = NULL; > + struct work_pool_data* data = NULL; > + for (;;) { > + > + // get next work item > + pthread_mutex_lock(thread_data->mutex); > + data = NULL; > + work_pool = thread_data->work_pool; > + for (i = 0; i < thread_data->work_pool_size; i++) { > + if (!work_pool->is_started) { > + data = work_pool; > + data->is_started = 1; > + break; > + } > + work_pool++; > + } > + pthread_mutex_unlock(thread_data->mutex); > + > + if (!data) { > + // no more work to do > + return NULL; > + } > + > + // if we are common section provider, init and signal > + if (data->is_common) { > + data->pls->parent = data->ctx; > + ret = update_init_section(data->pls); > + if (ret < 0) { > + pthread_cond_signal(data->common_condition); > + goto end; > + } > + else > + ret = AVERROR(pthread_cond_signal(data->common_condition)); > + } > + > + // if we depend on common section provider, wait for signal and copy > + if (data->common_pls) { > + ret = AVERROR(pthread_cond_wait(data->common_condition, > data->common_mutex)); > + if (ret < 0) > + goto end; > + > + if (!data->common_pls->init_sec_buf) { > + goto end; > + ret = AVERROR(EFAULT); > + } > + > + ret = copy_init_section(data->pls, data->common_pls); > + if (ret < 0) > + goto end; > + } > + > + ret = begin_open_demux_for_component(data->ctx, data->pls); > + if (ret < 0) > + goto end; > + > + end: > + data->result = ret; > + } > + > + > + return NULL; > +} > + > +static void create_work_pool_data(AVFormatContext* ctx, int stream_index, > + struct representation* pls, struct representation* common_pls, > + struct work_pool_data* init_data, pthread_mutex_t* common_mutex, > + pthread_cond_t* common_condition) > +{ > + init_data->ctx = ctx; > + init_data->pls = pls; > + init_data->pls->stream_index = stream_index; > + init_data->common_condition = common_condition; > + init_data->common_mutex = common_mutex; > + init_data->result = -1; > + > + if (pls == common_pls) { > + init_data->is_common = 1; > + } > + else if (common_pls) { > + init_data->common_pls = common_pls; > + } > +} > + > +static int start_thread(struct thread_data *thread_data, > + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t > *mutex) > +{ > + int ret; > + > + thread_data->mutex = mutex; > + thread_data->work_pool = work_pool; > + thread_data->work_pool_size = work_pool_size; > + > + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, > (void*)thread_data)); > + if (ret == 0) > + thread_data->is_started = 1; > + > + return ret; > +} > + > static int dash_read_header(AVFormatContext *s) > { > DASHContext *c = s->priv_data; > @@ -2040,63 +2186,245 @@ static int dash_read_header(AVFormatContext *s) > av_dict_set(&c->avio_opts, "seekable", "0", 0); > } > > - if(c->n_videos) > + if (c->n_videos) > c->is_init_section_common_video = > is_common_init_section_exist(c->videos, c->n_videos); > > - /* Open the demuxer for video and audio components if available */ > - for (i = 0; i < c->n_videos; i++) { > - rep = c->videos[i]; > - if (i > 0 && c->is_init_section_common_video) { > - ret = copy_init_section(rep, c->videos[0]); > + if (c->n_audios) > + c->is_init_section_common_audio = > is_common_init_section_exist(c->audios, c->n_audios); > + > + if (c->n_subtitles) > + c->is_init_section_common_subtitle = > is_common_init_section_exist(c->subtitles, c->n_subtitles); > + > + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; > + int threads = FFMIN(nstreams, c->init_threads); > + > + if (threads > 1) > + { > + // alloc data > + struct work_pool_data* init_data = (struct > work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); > + if (!init_data) > + return AVERROR(ENOMEM); > + > + struct thread_data* thread_data = (struct > thread_data*)av_mallocz(sizeof(struct thread_data) * threads); > + if (!thread_data) > + return AVERROR(ENOMEM); > + > + // alloc mutex and conditions > + pthread_mutex_t work_pool_mutex; > + > + pthread_mutex_t common_video_mutex; > + pthread_cond_t common_video_cond; > + > + pthread_mutex_t common_audio_mutex; > + pthread_cond_t common_audio_cond; > + > + pthread_mutex_t common_subtitle_mutex; > + pthread_cond_t common_subtitle_cond; > + > + // init mutex and conditions > + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); > + if (ret < 0) > + goto cleanup; > + > + if (c->is_init_section_common_video) { > + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL)); > if (ret < 0) > - return ret; > + goto cleanup; > + > + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL)); > + if (ret < 0) > + goto cleanup; > } > - ret = open_demux_for_component(s, rep); > > - if (ret) > - return ret; > - rep->stream_index = stream_index; > - ++stream_index; > - } > + if (c->is_init_section_common_audio) { > + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)); > + if (ret < 0) > + goto cleanup; > > - if(c->n_audios) > - c->is_init_section_common_audio = > is_common_init_section_exist(c->audios, c->n_audios); > + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL)); > + if (ret < 0) > + goto cleanup; > + } > > - for (i = 0; i < c->n_audios; i++) { > - rep = c->audios[i]; > - if (i > 0 && c->is_init_section_common_audio) { > - ret = copy_init_section(rep, c->audios[0]); > + if (c->is_init_section_common_subtitle) { > + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL)); > if (ret < 0) > - return ret; > + goto cleanup; > + > + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL)); > + if (ret < 0) > + goto cleanup; > } > - ret = open_demux_for_component(s, rep); > > - if (ret) > - return ret; > - rep->stream_index = stream_index; > - ++stream_index; > - } > + // init work pool data > + struct work_pool_data* current_data = init_data; > > - if (c->n_subtitles) > - c->is_init_section_common_subtitle = > is_common_init_section_exist(c->subtitles, c->n_subtitles); > + for (i = 0; i < c->n_videos; i++) { > + create_work_pool_data(s, stream_index, c->videos[i], > + c->is_init_section_common_video ? c->videos[0] : NULL, > + current_data, &common_video_mutex, &common_video_cond); > > - for (i = 0; i < c->n_subtitles; i++) { > - rep = c->subtitles[i]; > - if (i > 0 && c->is_init_section_common_subtitle) { > - ret = copy_init_section(rep, c->subtitles[0]); > + stream_index++; > + current_data++; > + } > + > + for (i = 0; i < c->n_audios; i++) { > + create_work_pool_data(s, stream_index, c->audios[i], > + c->is_init_section_common_audio ? c->audios[0] : NULL, > + current_data, &common_audio_mutex, &common_audio_cond); > + > + stream_index++; > + current_data++; > + } > + > + for (i = 0; i < c->n_subtitles; i++) { > + create_work_pool_data(s, stream_index, c->subtitles[i], > + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, > + current_data, &common_subtitle_mutex, &common_subtitle_cond); > + > + stream_index++; > + current_data++; > + } > + > + // start threads > + struct thread_data* current_thread = thread_data; > + for (i = 0; i < threads; i++) { > + ret = start_thread(current_thread, init_data, nstreams, > &work_pool_mutex); > if (ret < 0) > - return ret; > + goto cleanup; > + > + current_thread++; > } > - ret = open_demux_for_component(s, rep); > > - if (ret) > - return ret; > - rep->stream_index = stream_index; > - ++stream_index; > + cleanup: > + // we need to cleanup even in case of errors, so we need to store > results of init, run and cleanup > + int initResult = ret; > + int runResult = 0; > + int cleanupResult = 0; > + > + // join threads > + current_thread = thread_data; > + for (i = 0; i < threads; i++) { > + if (current_thread->is_started) { > + ret = AVERROR(pthread_join(current_thread->thread, NULL)); > + if (ret < 0) > + cleanupResult = ret; > + } > + current_thread++; > + } > + > + // finalize streams and collect results > + current_data = init_data; > + for (i = 0; i < nstreams; i++) { > + if (current_data->result < 0) { > + // thread ran into error: collect result > + runResult = current_data->result; > + } > + else { > + // thread success: create streams on AVFormatContext > + ret = end_open_demux_for_component(s, current_data->pls); > + if (ret < 0) > + runResult = ret; > + } > + current_data++; > + } > + > + // cleanup mutex and conditions > + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + if (c->is_init_section_common_video) { > + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_video_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + if (c->is_init_section_common_audio) { > + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_audio_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + if (c->is_init_section_common_subtitle) { > + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)); > + if (ret < 0) > + cleanupResult = ret; > + > + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond)); > + if (ret < 0) > + cleanupResult = ret; > + } > + > + // return results if errors have occured in one of the phases > + if (initResult < 0) > + return initResult; > + > + if (runResult < 0) > + return runResult; > + > + if (cleanupResult < 0) > + return cleanupResult; > } > + else > + { > + /* Open the demuxer for video and audio components if available */ > + for (i = 0; i < c->n_videos; i++) { > + rep = c->videos[i]; > + if (i > 0 && c->is_init_section_common_video) { > + ret = copy_init_section(rep, c->videos[0]); > + if (ret < 0) > + return ret; > + } > + ret = open_demux_for_component(s, rep); > > - if (!stream_index) > - return AVERROR_INVALIDDATA; > + if (ret) > + return ret; > + rep->stream_index = stream_index; > + ++stream_index; > + } > + > + for (i = 0; i < c->n_audios; i++) { > + rep = c->audios[i]; > + if (i > 0 && c->is_init_section_common_audio) { > + ret = copy_init_section(rep, c->audios[0]); > + if (ret < 0) > + return ret; > + } > + ret = open_demux_for_component(s, rep); > + > + if (ret) > + return ret; > + rep->stream_index = stream_index; > + ++stream_index; > + } > + > + for (i = 0; i < c->n_subtitles; i++) { > + rep = c->subtitles[i]; > + if (i > 0 && c->is_init_section_common_subtitle) { > + ret = copy_init_section(rep, c->subtitles[0]); > + if (ret < 0) > + return ret; > + } > + ret = open_demux_for_component(s, rep); > + > + if (ret) > + return ret; > + rep->stream_index = stream_index; > + ++stream_index; > + } > + > + if (!stream_index) > + return AVERROR_INVALIDDATA; > + } > > /* Create a program */ > program = av_new_program(s, 0); > @@ -2349,6 +2677,7 @@ static const AVOption dash_options[] = { > {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, > INT_MIN, INT_MAX, FLAGS}, > { "cenc_decryption_key", "Media decryption key (hex)", > OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, > INT_MAX, .flags = FLAGS }, > + { "init_threads", "Number of threads to use for initializing the DASH > stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS }, > {NULL} > }; > > -- > 2.31.1.windows.1 > > > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel > > To unsubscribe, visit link above, or email > ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe". _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".