Andreas Rheinhardt <andreas.rheinha...@outlook.com> 于2022年8月31日周三 10:54写道: > > Lukas Fellechner: > > This patch adds an "init-threads" option, specifying the max > > number of threads to use. Multiple worker threads are spun up > > to massively bring down init times. > > --- > > libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++- > > 1 file changed, 350 insertions(+), 1 deletion(-) > > > > diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c > > index e82da45e43..20f2557ea3 100644 > > --- a/libavformat/dashdec.c > > +++ b/libavformat/dashdec.c > > @@ -24,6 +24,7 @@ > > #include "libavutil/opt.h" > > #include "libavutil/time.h" > > #include "libavutil/parseutils.h" > > +#include "libavutil/thread.h" > > #include "internal.h" > > #include "avio_internal.h" > > #include "dash.h" > > @@ -152,6 +153,8 @@ typedef struct DASHContext { > > int max_url_size; > > char *cenc_decryption_key; > > > > + int init_threads; > > + > > /* Flags for init section*/ > > int is_init_section_common_video; > > int is_init_section_common_audio; > > @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char > > *key, char **value) > > } > > } > > > > +#if HAVE_THREADS > > + > > +struct work_pool_data > > +{ > > + AVFormatContext *ctx; > > + struct representation *pls; > > + struct representation *common_pls; > > + pthread_mutex_t *common_mutex; > > + pthread_cond_t *common_condition; > > + int is_common; > > + int is_started; > > + int result; > > +}; > > + > > +struct thread_data > > This is against our naming conventions: CamelCase for struct tags and > typedefs, lowercase names with underscore for variable names. > > > +{ > > + pthread_t thread; > > + pthread_mutex_t *mutex; > > + struct work_pool_data *work_pool; > > + int work_pool_size; > > + int is_started; > > + int has_error; > > +}; > > + > > +static void *worker_thread(void *ptr) > > +{ > > + int ret = 0; > > + int i; > > + struct thread_data *thread_data = (struct thread_data*)ptr; > > + struct work_pool_data *work_pool = NULL; > > + struct work_pool_data *data = NULL; > > + for (;;) { > > + > > + // get next work item unless there was an error > > + pthread_mutex_lock(thread_data->mutex); > > + data = NULL; > > + if (!thread_data->has_error) { > > + work_pool = thread_data->work_pool; > > + for (i = 0; i < thread_data->work_pool_size; i++) { > > + if (!work_pool->is_started) { > > + data = work_pool; > > + data->is_started = 1; > > + break; > > + } > > + work_pool++; > > + } > > + } > > + pthread_mutex_unlock(thread_data->mutex); > > + > > + if (!data) { > > + // no more work to do > > + return NULL; > > + } > > + > > + // if we are common section provider, init and signal > > + if (data->is_common) { > > + data->pls->parent = data->ctx; > > + ret = update_init_section(data->pls); > > + if (ret < 0) { > > + pthread_cond_signal(data->common_condition); > > + goto end; > > + } > > + else > > + ret = AVERROR(pthread_cond_signal(data->common_condition)); > > + } > > + > > + // if we depend on common section provider, wait for signal and > > copy > > + if (data->common_pls) { > > + ret = AVERROR(pthread_cond_wait(data->common_condition, > > data->common_mutex)); > > + if (ret < 0) > > + goto end; > > + > > + if (!data->common_pls->init_sec_buf) { > > + goto end; > > + ret = AVERROR(EFAULT); > > + } > > + > > + ret = copy_init_section(data->pls, data->common_pls); > > + if (ret < 0) > > + goto end; > > + } > > + > > + ret = begin_open_demux_for_component(data->ctx, data->pls); > > + if (ret < 0) > > + goto end; > > + > > + end: > > + data->result = ret; > > + > > + // notify error to other threads and exit > > + if (ret < 0) { > > + pthread_mutex_lock(thread_data->mutex); > > + thread_data->has_error = 1; > > + pthread_mutex_unlock(thread_data->mutex); > > + return NULL; > > + } > > + } > > + > > + > > + return NULL; > > +} > > + > > +static void create_work_pool_data(AVFormatContext *ctx, int stream_index, > > + struct representation *pls, struct representation *common_pls, > > + struct work_pool_data *init_data, pthread_mutex_t *common_mutex, > > + pthread_cond_t *common_condition) > > +{ > > + init_data->ctx = ctx; > > + init_data->pls = pls; > > + init_data->pls->stream_index = stream_index; > > + init_data->common_condition = common_condition; > > + init_data->common_mutex = common_mutex; > > + init_data->result = -1; > > + > > + if (pls == common_pls) { > > + init_data->is_common = 1; > > + } > > + else if (common_pls) { > > + init_data->common_pls = common_pls; > > + } > > +} > > + > > +static int start_thread(struct thread_data *thread_data, > > + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t > > *mutex) > > +{ > > + int ret; > > + > > + thread_data->mutex = mutex; > > + thread_data->work_pool = work_pool; > > + thread_data->work_pool_size = work_pool_size; > > + > > + ret = AVERROR(pthread_create(&thread_data->thread, NULL, > > worker_thread, (void*)thread_data)); > > + if (ret == 0) > > + thread_data->is_started = 1; > > + > > + return ret; > > +} > > + > > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, > > int threads) > > +{ > > + DASHContext *c = s->priv_data; > > + int ret = 0; > > + int stream_index = 0; > > + int i; > > We allow "for (int i = 0;" > > > + > > + // alloc data > > + struct work_pool_data *init_data = (struct > > work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); > > + if (!init_data) > > + return AVERROR(ENOMEM); > > + > > + struct thread_data *thread_data = (struct > > thread_data*)av_mallocz(sizeof(struct thread_data) * threads); > > + if (!thread_data) > > + return AVERROR(ENOMEM); > > 1. init_data leaks here on error. > 2. In fact, it seems to me that both init_data and thread_data are > nowhere freed. > > > + > > + // alloc mutex and conditions > > + pthread_mutex_t work_pool_mutex; > > + > > + pthread_mutex_t common_video_mutex; > > + pthread_cond_t common_video_cond; > > + > > + pthread_mutex_t common_audio_mutex; > > + pthread_cond_t common_audio_cond; > > + > > + pthread_mutex_t common_subtitle_mutex; > > + pthread_cond_t common_subtitle_cond; > > + > > + // init mutex and conditions > > + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + > > + if (c->is_init_section_common_video) { > > + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + > > + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + } > > + > > + if (c->is_init_section_common_audio) { > > + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + > > + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + } > > + > > + if (c->is_init_section_common_subtitle) { > > + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + > > + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL)); > > + if (ret < 0) > > + goto cleanup; > > + } > > + > > + // init work pool data > > + struct work_pool_data* current_data = init_data; > > + > > + for (i = 0; i < c->n_videos; i++) { > > + create_work_pool_data(s, stream_index, c->videos[i], > > + c->is_init_section_common_video ? c->videos[0] : NULL, > > + current_data, &common_video_mutex, &common_video_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > + > > + for (i = 0; i < c->n_audios; i++) { > > + create_work_pool_data(s, stream_index, c->audios[i], > > + c->is_init_section_common_audio ? c->audios[0] : NULL, > > + current_data, &common_audio_mutex, &common_audio_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > + > > + for (i = 0; i < c->n_subtitles; i++) { > > + create_work_pool_data(s, stream_index, c->subtitles[i], > > + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, > > + current_data, &common_subtitle_mutex, &common_subtitle_cond); > > + > > + stream_index++; > > + current_data++; > > + } > > This is very repetitive. > > > + > > + // start threads > > + struct thread_data *current_thread = thread_data; > > + for (i = 0; i < threads; i++) { > > + ret = start_thread(current_thread, init_data, nstreams, > > &work_pool_mutex); > > + if (ret < 0) > > + goto cleanup; > > + > > + current_thread++; > > + } > > + > > +cleanup: > > + // we need to cleanup even in case of errors, so we need to store > > results of init, run and cleanup > > + int initResult = ret; > > + int runResult = 0; > > + int cleanupResult = 0; > > + > > + // join threads > > + current_thread = thread_data; > > + for (i = 0; i < threads; i++) { > > + if (current_thread->is_started) { > > + ret = AVERROR(pthread_join(current_thread->thread, NULL)); > > + if (ret < 0) > > + cleanupResult = ret; > > + } > > + current_thread++; > > + } > > + > > + // finalize streams and collect results > > + current_data = init_data; > > + for (i = 0; i < nstreams; i++) { > > + if (current_data->result < 0) { > > + // thread ran into error: collect result and break > > + runResult = current_data->result; > > + break; > > + } > > + else { > > + // thread success: create streams on AVFormatContext > > + ret = end_open_demux_for_component(s, current_data->pls); > > + if (ret < 0) > > + runResult = ret; > > + } > > + current_data++; > > + } > > + > > + // cleanup mutex and conditions > > + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex)); > > + if (ret < 0) > > + cleanupResult = ret; > > + > > + if (c->is_init_section_common_video) { > > + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex)); > > + if (ret < 0) > > + cleanupResult = ret; > > + > > + ret = AVERROR(pthread_cond_destroy(&common_video_cond)); > > + if (ret < 0) > > + cleanupResult = ret; > > + } > > + > > + if (c->is_init_section_common_audio) { > > + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex)); > > + if (ret < 0) > > + cleanupResult = ret; > > + > > + ret = AVERROR(pthread_cond_destroy(&common_audio_cond)); > > + if (ret < 0) > > + cleanupResult = ret; > > + } > > + > > + if (c->is_init_section_common_subtitle) { > > + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)); > > + if (ret < 0) > > + cleanupResult = ret; > > + > > + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond)); > > + if (ret < 0) > > + cleanupResult = ret; > > + } > > + > > + // return results if errors have occured in one of the phases > > + if (initResult < 0) > > + return initResult; > > + > > + if (runResult < 0) > > + return runResult; > > + > > + if (cleanupResult < 0) > > + return cleanupResult; > > + > > + return 0; > > +} > > + > > +#endif > > + > > static int dash_read_header(AVFormatContext *s) > > { > > DASHContext *c = s->priv_data; > > @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s) > > if (c->n_subtitles) > > c->is_init_section_common_subtitle = > > is_common_init_section_exist(c->subtitles, c->n_subtitles); > > > > + int threads = 0; > > + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; > > + > > +#if HAVE_THREADS > > + threads = FFMIN(nstreams, c->init_threads); > > +#endif > > + > > + if (threads > 1) > > + { > > +#if HAVE_THREADS > > + ret = init_streams_multithreaded(s, nstreams, threads); > > + if (ret < 0) > > + return ret; > > +#endif > > + } > > + else > > + { > > /* Open the demuxer for video and audio components if available */ > > for (i = 0; i < c->n_videos; i++) { > > rep = c->videos[i]; > > @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s) > > > > if (!stream_index) > > return AVERROR_INVALIDDATA; > > + } > > > > /* Create a program */ > > program = av_new_program(s, 0); > > @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = { > > OFFSET(allowed_extensions), AV_OPT_TYPE_STRING, > > {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, > > INT_MIN, INT_MAX, FLAGS}, > > - { "cenc_decryption_key", "Media decryption key (hex)", > > OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, > > INT_MAX, .flags = FLAGS }, > > + { "cenc_decryption_key", "Media decryption key (hex)", > > OFFSET(cenc_decryption_key), > > + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = > > FLAGS }, > > + { "init_threads", "Number of threads to use for initializing the DASH > > stream", > > + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS }, > > {NULL} > > }; > > > > -- > > 2.31.1.windows.1 > > > > 1. We actually have an API to process multiple tasks by different > threads: Look at libavutil/slicethread.h. Why can't you reuse that? I saw that usually be used in avfilters for slice multi-thread, or i misunderstand something?
> 2. In case initialization of one of the conditions/mutexes fails, you > are nevertheless destroying them; you are even destroying completely > uninitialized mutexes. This is undefined behaviour. Checking the result > of it does not fix this. > > - Andreas Thanks Steven _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".