This patch adds an "init_threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times. --- libavformat/dashdec.c | 286 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 285 insertions(+), 1 deletion(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c index e82da45e43..0532e2c918 100644 --- a/libavformat/dashdec.c +++ b/libavformat/dashdec.c @@ -24,6 +24,8 @@ #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/parseutils.h" +#include "libavutil/thread.h" +#include "libavutil/slicethread.h" #include "internal.h" #include "avio_internal.h" #include "dash.h" @@ -152,6 +154,8 @@ typedef struct DASHContext { int max_url_size; char *cenc_decryption_key; + int init_threads; + /* Flags for init section*/ int is_init_section_common_video; int is_init_section_common_audio; @@ -2033,6 +2037,265 @@ static void move_metadata(AVStream *st, const char *key, char **value) } } +#if HAVE_THREADS + +typedef struct WorkPoolData +{ + AVFormatContext *ctx; + struct representation *pls; + struct representation *common_pls; + pthread_mutex_t *common_mutex; + pthread_cond_t *common_condition; + int is_common; + int is_started; + int result; +} WorkPoolData; + +static void thread_worker(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads) +{ + WorkPoolData *work_pool = (WorkPoolData*)priv; + WorkPoolData *data = work_pool + jobnr; + int ret; + + // if we are common section provider, init and signal + if (data->is_common) { + data->pls->parent = data->ctx; + ret = update_init_section(data->pls); + if (ret < 0) { + pthread_cond_signal(data->common_condition); + goto end; + } + else + ret = AVERROR(pthread_cond_signal(data->common_condition)); + } + + // if we depend on common section provider, wait for signal and copy + if (data->common_pls) { + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); + if (ret < 0) + goto end; + + if (!data->common_pls->init_sec_buf) { + goto end; + ret = AVERROR(EFAULT); + } + + ret = copy_init_section(data->pls, data->common_pls); + if (ret < 0) + goto end; + } + + ret = begin_open_demux_for_component(data->ctx, data->pls); + if (ret < 0) + goto end; + +end: + data->result = ret; +} + +static void create_work_pool_data(AVFormatContext *ctx, int *stream_index, + struct representation **streams, int num_streams, int is_init_section_common, + WorkPoolData *work_pool, pthread_mutex_t* common_mutex, + pthread_cond_t* common_condition) +{ + work_pool += *stream_index; + + for (int i = 0; i < num_streams; i++) { + work_pool->ctx = ctx; + work_pool->pls = streams[i]; + work_pool->pls->stream_index = *stream_index; + work_pool->common_condition = common_condition; + work_pool->common_mutex = common_mutex; + work_pool->result = -1; + + if (is_init_section_common) { + if (i == 0) + work_pool->is_common = 1; + else + work_pool->common_pls = streams[0]; + } + + work_pool++; + *stream_index = *stream_index + 1; + } +} + +static pthread_mutex_t* create_mutex() +{ + pthread_mutex_t* mutex = (pthread_mutex_t*)av_malloc(sizeof(pthread_mutex_t)); + if (!mutex) + return NULL; + + if (pthread_mutex_init(mutex, NULL)) { + av_free(mutex); + return NULL; + } + + return mutex; +} + +static int free_mutex(pthread_mutex_t **mutex) +{ + int ret = 0; + if (*mutex) { + ret = pthread_mutex_destroy(*mutex); + av_free(*mutex); + *mutex = NULL; + } + return ret; +} + +static pthread_cond_t* create_cond() +{ + pthread_cond_t* cond = (pthread_cond_t*)av_malloc(sizeof(pthread_cond_t)); + if (!cond) + return NULL; + + if (pthread_cond_init(cond, NULL)) { + av_free(cond); + return NULL; + } + + return cond; +} + +static int free_cond(pthread_cond_t **cond) +{ + int ret = 0; + if (*cond) { + ret = pthread_cond_destroy(*cond); + av_free(*cond); + *cond = NULL; + } + return ret; +} + +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads) +{ + DASHContext *c = s->priv_data; + int ret = 0; + int stream_index = 0; + AVSliceThread *slice_thread; + + // we need to cleanup even in case of errors, + // so we need to store results of run and cleanup phase + int initResult = 0; + int runResult = 0; + int cleanupResult = 0; + + // alloc data + WorkPoolData *work_pool = (WorkPoolData*)av_mallocz( + sizeof(WorkPoolData) * nstreams); + if (!work_pool) + return AVERROR(ENOMEM); + + if (!avpriv_slicethread_create(&slice_thread, (void*)work_pool, &thread_worker, NULL, threads)) { + av_free(work_pool); + return AVERROR(ENOMEM); +} + + // alloc mutex and conditions + c->init_mutex = create_mutex(); + + pthread_mutex_t *common_video_mutex = create_mutex(); + pthread_cond_t *common_video_cond = create_cond(); + + pthread_mutex_t *common_audio_mutex = create_mutex(); + pthread_cond_t *common_audio_cond = create_cond(); + + pthread_mutex_t *common_subtitle_mutex = create_mutex(); + pthread_cond_t *common_subtitle_cond = create_cond(); + + if (!(c->init_mutex && common_video_mutex && common_video_cond && common_audio_mutex && + common_audio_cond && common_subtitle_mutex && common_subtitle_cond)) { + initResult = AVERROR(ENOMEM); + goto cleanup; + } + + // set work pool data + create_work_pool_data(s, &stream_index, c->videos, c->n_videos, + c->is_init_section_common_video, work_pool, + common_video_mutex, common_video_cond); + + create_work_pool_data(s, &stream_index, c->audios, c->n_audios, + c->is_init_section_common_audio, work_pool, + common_audio_mutex, common_audio_cond); + + create_work_pool_data(s, &stream_index, c->subtitles, c->n_subtitles, + c->is_init_section_common_subtitle, work_pool, + common_subtitle_mutex, common_subtitle_cond); + + // run threads + avpriv_slicethread_execute(slice_thread, nstreams, 0); + + // finalize streams and collect results + WorkPoolData* current_data = work_pool; + for (int i = 0; i < nstreams; i++) { + if (current_data->result < 0) { + // thread ran into error: collect result and break + runResult = current_data->result; + break; + } + else { + // thread success: create streams on AVFormatContext + ret = end_open_demux_for_component(s, current_data->pls); + if (ret < 0) { + runResult = ret; + break; + } + } + current_data++; + } + +cleanup: + // cleanup mutex and conditions + ret = free_mutex(&c->init_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_video_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_video_cond); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_audio_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_audio_cond); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_subtitle_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_subtitle_cond); + if (ret < 0) + cleanupResult = ret; + + // cleanup threads and workpool + av_free(work_pool); + avpriv_slicethread_free(&slice_thread); + + // return results if errors have occured in one of the phases + if (initResult < 0) + return initResult; + + if (runResult < 0) + return runResult; + + if (cleanupResult < 0) + return cleanupResult; + + return 0; +} + +#endif + static int dash_read_header(AVFormatContext *s) { DASHContext *c = s->priv_data; @@ -2067,6 +2330,23 @@ static int dash_read_header(AVFormatContext *s) if (c->n_subtitles) c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); + int threads = 1; + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; + +#if HAVE_THREADS + threads = FFMIN(nstreams, c->init_threads); +#endif + + if (threads > 1) + { +#if HAVE_THREADS + ret = init_streams_multithreaded(s, nstreams, threads); + if (ret < 0) + return ret; +#endif + } + else + { /* Open the demuxer for video and audio components if available */ for (i = 0; i < c->n_videos; i++) { rep = c->videos[i]; @@ -2115,6 +2395,7 @@ static int dash_read_header(AVFormatContext *s) if (!stream_index) return AVERROR_INVALIDDATA; + } /* Create a program */ program = av_new_program(s, 0); @@ -2366,7 +2647,10 @@ static const AVOption dash_options[] = { OFFSET(allowed_extensions), AV_OPT_TYPE_STRING, {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, INT_MIN, INT_MAX, FLAGS}, - { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "init_threads", "Number of threads to use for initializing the DASH stream", + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 1}, 1, INT_MAX, FLAGS }, {NULL} }; -- 2.28.0.windows.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".