Should have no functional effect on its own, but will be useful in following commits. --- libavcodec/pthread_frame.c | 253 +++++++++++++++++++++++-------------- 1 file changed, 155 insertions(+), 98 deletions(-)
diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c index 1b1b96623f..78e6cf668b 100644 --- a/libavcodec/pthread_frame.c +++ b/libavcodec/pthread_frame.c @@ -59,12 +59,6 @@ enum { STATE_SETUP_FINISHED, }; -enum { - UNINITIALIZED, ///< Thread has not been created, AVCodec->close mustn't be called - NEEDS_CLOSE, ///< FFCodec->close needs to be called - INITIALIZED, ///< Thread has been properly set up -}; - typedef struct DecodedFrames { AVFrame **f; size_t nb_f; @@ -82,7 +76,8 @@ typedef struct PerThreadContext { struct FrameThreadContext *parent; pthread_t thread; - int thread_init; + int thread_started; + unsigned pthread_init_cnt;///< Number of successfully initialized mutexes/conditions pthread_cond_t input_cond; ///< Used to wait for a new packet from the main thread. pthread_cond_t progress_cond; ///< Used by child threads to wait for progress to change. @@ -108,12 +103,6 @@ typedef struct PerThreadContext { int hwaccel_serializing; int async_serializing; - // set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used; - // cannot check hwaccel caps directly, because - // worked threads clear hwaccel state for thread-unsafe hwaccels - // after each decode call - int hwaccel_threadsafe; - atomic_int debug_threads; ///< Set if the FF_DEBUG_THREADS option is set. /// The following two fields have the same semantics as the DecodeContext field @@ -121,10 +110,32 @@ typedef struct PerThreadContext { enum AVPictureType initial_pict_type; } PerThreadContext; +typedef struct ChildDecoder { + AVCodecContext *ctx; + int needs_close; + + // set to 1 in ff_thread_finish_setup() when a threadsafe hwaccel is used; + // cannot check hwaccel caps directly, because + // worked threads clear hwaccel state for thread-unsafe hwaccels + // after each decode call + int hwaccel_threadsafe; + + struct FrameThreadContext *parent; + + // The worker thread this decoder is currently assigned to. + // May change between individual decode calls. + PerThreadContext *thread; +} ChildDecoder; + /** * Context stored in the client AVCodecInternal thread_ctx. */ typedef struct FrameThreadContext { + /** + * Decoder contexts that get assigned to frame threads. + */ + ChildDecoder *decoders; + PerThreadContext *threads; ///< The contexts for each thread. PerThreadContext *prev_thread; ///< The last thread submit_packet() was called on. @@ -413,8 +424,8 @@ FF_ENABLE_DEPRECATION_WARNINGS if (codec->update_thread_context_for_user) err = codec->update_thread_context_for_user(dst, src); } else { - const PerThreadContext *p_src = src->internal->thread_ctx; - PerThreadContext *p_dst = dst->internal->thread_ctx; + const ChildDecoder *cd_src = src->internal->thread_ctx; + ChildDecoder *cd_dst = dst->internal->thread_ctx; if (codec->update_thread_context) { err = codec->update_thread_context(dst, src); @@ -423,16 +434,16 @@ FF_ENABLE_DEPRECATION_WARNINGS } // reset dst hwaccel state if needed - av_assert0(p_dst->hwaccel_threadsafe || + av_assert0(cd_dst->hwaccel_threadsafe || (!dst->hwaccel && !dst->internal->hwaccel_priv_data)); - if (p_dst->hwaccel_threadsafe && - (!p_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) { + if (cd_dst->hwaccel_threadsafe && + (!cd_src->hwaccel_threadsafe || dst->hwaccel != src->hwaccel)) { ff_hwaccel_uninit(dst); - p_dst->hwaccel_threadsafe = 0; + cd_dst->hwaccel_threadsafe = 0; } // propagate hwaccel state for threadsafe hwaccels - if (p_src->hwaccel_threadsafe) { + if (cd_src->hwaccel_threadsafe) { const FFHWAccel *hwaccel = ffhwaccel(src->hwaccel); if (!dst->hwaccel) { if (hwaccel->priv_data_size) { @@ -455,7 +466,7 @@ FF_ENABLE_DEPRECATION_WARNINGS return err; } } - p_dst->hwaccel_threadsafe = 1; + cd_dst->hwaccel_threadsafe = 1; } } @@ -503,6 +514,7 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, AVPacket *in_pkt) { FrameThreadContext *fctx = p->parent; + ChildDecoder *cd = p->avctx->internal->thread_ctx; PerThreadContext *prev_thread = fctx->prev_thread; const AVCodec *codec = p->avctx->codec; int ret; @@ -546,8 +558,8 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, } /* transfer the stashed hwaccel state, if any */ - av_assert0(!p->avctx->hwaccel || p->hwaccel_threadsafe); - if (!p->hwaccel_threadsafe) { + av_assert0(!p->avctx->hwaccel || cd->hwaccel_threadsafe); + if (!cd->hwaccel_threadsafe) { FFSWAP(const AVHWAccel*, p->avctx->hwaccel, fctx->stash_hwaccel); FFSWAP(void*, p->avctx->hwaccel_context, fctx->stash_hwaccel_context); FFSWAP(void*, p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv); @@ -626,6 +638,7 @@ finish: void ff_thread_report_progress(ThreadFrame *f, int n, int field) { + ChildDecoder *cd; PerThreadContext *p; atomic_int *progress = f->progress ? f->progress->progress : NULL; @@ -633,7 +646,8 @@ void ff_thread_report_progress(ThreadFrame *f, int n, int field) atomic_load_explicit(&progress[field], memory_order_relaxed) >= n) return; - p = f->owner[field]->internal->thread_ctx; + cd = f->owner[field]->internal->thread_ctx; + p = cd->thread; if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed)) av_log(f->owner[field], AV_LOG_DEBUG, @@ -649,6 +663,7 @@ void ff_thread_report_progress(ThreadFrame *f, int n, int field) void ff_thread_await_progress(const ThreadFrame *f, int n, int field) { + ChildDecoder *cd; PerThreadContext *p; atomic_int *progress = f->progress ? f->progress->progress : NULL; @@ -656,7 +671,8 @@ void ff_thread_await_progress(const ThreadFrame *f, int n, int field) atomic_load_explicit(&progress[field], memory_order_acquire) >= n) return; - p = f->owner[field]->internal->thread_ctx; + cd = f->owner[field]->internal->thread_ctx; + p = cd->thread; if (atomic_load_explicit(&p->debug_threads, memory_order_relaxed)) av_log(f->owner[field], AV_LOG_DEBUG, @@ -669,14 +685,16 @@ void ff_thread_await_progress(const ThreadFrame *f, int n, int field) } void ff_thread_finish_setup(AVCodecContext *avctx) { + ChildDecoder *cd; PerThreadContext *p; if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return; - p = avctx->internal->thread_ctx; + cd = avctx->internal->thread_ctx; + p = cd->thread; - p->hwaccel_threadsafe = avctx->hwaccel && - (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE); + cd->hwaccel_threadsafe = avctx->hwaccel && + (ffhwaccel(avctx->hwaccel)->caps_internal & HWACCEL_CAP_THREAD_SAFE); if (hwaccel_serial(avctx) && !p->hwaccel_serializing) { pthread_mutex_lock(&p->parent->hwaccel_mutex); @@ -716,13 +734,14 @@ void ff_thread_finish_setup(AVCodecContext *avctx) { /// Waits for all threads to finish. static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count) { - int i; - async_unlock(fctx); - for (i = 0; i < thread_count; i++) { + for (int i = 0; i < thread_count && fctx->threads; i++) { PerThreadContext *p = &fctx->threads[i]; + if (!p->thread_started) + break; + if (atomic_load(&p->state) != STATE_INPUT_READY) { pthread_mutex_lock(&p->progress_mutex); while (atomic_load(&p->state) != STATE_INPUT_READY) @@ -750,24 +769,35 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) { FrameThreadContext *fctx = avctx->internal->thread_ctx; const FFCodec *codec = ffcodec(avctx->codec); - int i; park_frame_worker_threads(fctx, thread_count); - for (i = 0; i < thread_count; i++) { + // clean up per-thread contexts + for (int i = 0; i < thread_count && fctx->threads; i++) { PerThreadContext *p = &fctx->threads[i]; - AVCodecContext *ctx = p->avctx; + + if (p->thread_started) { + pthread_mutex_lock(&p->mutex); + p->die = 1; + pthread_cond_signal(&p->input_cond); + pthread_mutex_unlock(&p->mutex); + + pthread_join(p->thread, NULL); + } + + ff_pthread_free(p, per_thread_offsets); + av_packet_free(&p->avpkt); + decoded_frames_free(&p->df); + } + av_freep(&fctx->threads); + + // clean up child decoders + for (int i = 0; i < thread_count && fctx->decoders; i++) { + ChildDecoder *cd = &fctx->decoders[i]; + AVCodecContext *ctx = cd->ctx; if (ctx->internal) { - if (p->thread_init == INITIALIZED) { - pthread_mutex_lock(&p->mutex); - p->die = 1; - pthread_cond_signal(&p->input_cond); - pthread_mutex_unlock(&p->mutex); - - pthread_join(p->thread, NULL); - } - if (codec->close && p->thread_init != UNINITIALIZED) + if (codec->close && cd->needs_close) codec->close(ctx); /* When using a threadsafe hwaccel, this is where @@ -790,18 +820,13 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) &ctx->nb_decoded_side_data); } - decoded_frames_free(&p->df); - - ff_pthread_free(p, per_thread_offsets); - av_packet_free(&p->avpkt); - - av_freep(&p->avctx); + av_freep(&cd->ctx); } + av_freep(&fctx->decoders); decoded_frames_free(&fctx->df); av_packet_free(&fctx->next_pkt); - av_freep(&fctx->threads); ff_pthread_free(fctx, thread_ctx_offsets); /* if we have stashed hwaccel state, move it to the user-facing context, @@ -814,22 +839,13 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) av_freep(&avctx->internal->thread_ctx); } -static av_cold int init_thread(PerThreadContext *p, int *threads_to_free, - FrameThreadContext *fctx, AVCodecContext *avctx, - const FFCodec *codec, int first) +static av_cold int dec_ctx_init(ChildDecoder *cd, int *decoders_to_free, + AVCodecContext *avctx, + const FFCodec *codec, int first) { AVCodecContext *copy; int err; - p->initial_pict_type = AV_PICTURE_TYPE_NONE; - if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) { - p->intra_only_flag = AV_FRAME_FLAG_KEY; - if (avctx->codec_type == AVMEDIA_TYPE_VIDEO) - p->initial_pict_type = AV_PICTURE_TYPE_I; - } - - atomic_init(&p->state, STATE_INPUT_READY); - copy = av_memdup(avctx, sizeof(*avctx)); if (!copy) return AVERROR(ENOMEM); @@ -837,19 +853,18 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free, copy->decoded_side_data = NULL; copy->nb_decoded_side_data = 0; - /* From now on, this PerThreadContext will be cleaned up by + /* From now on, this ChildDecoder will be cleaned up by * ff_frame_thread_free in case of errors. */ - (*threads_to_free)++; + (*decoders_to_free)++; - p->parent = fctx; - p->avctx = copy; + cd->ctx = copy; copy->internal = ff_decode_internal_alloc(); if (!copy->internal) return AVERROR(ENOMEM); ff_decode_internal_sync(copy, avctx); - copy->internal->thread_ctx = p; copy->internal->progress_frame_pool = avctx->internal->progress_frame_pool; + copy->internal->thread_ctx = cd; copy->delay = avctx->delay; @@ -866,13 +881,6 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free, } } - err = ff_pthread_init(p, per_thread_offsets); - if (err < 0) - return err; - - if (!(p->avpkt = av_packet_alloc())) - return AVERROR(ENOMEM); - copy->internal->is_frame_mt = 1; if (!first) copy->internal->is_copy = 1; @@ -889,11 +897,11 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free, err = codec->init(copy); if (err < 0) { if (codec->caps_internal & FF_CODEC_CAP_INIT_CLEANUP) - p->thread_init = NEEDS_CLOSE; + cd->needs_close = 1; return err; } } - p->thread_init = NEEDS_CLOSE; + cd->needs_close = 1; if (first) { update_context_from_thread(avctx, copy, 1); @@ -908,12 +916,40 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free, } } - atomic_init(&p->debug_threads, (copy->debug & FF_DEBUG_THREADS) != 0); + return 0; +} - err = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p)); - if (err < 0) - return err; - p->thread_init = INITIALIZED; +static av_cold int init_thread(PerThreadContext *p, FrameThreadContext *fctx, + AVCodecContext *avctx, AVCodecContext *child) +{ + int ret = 0; + + p->initial_pict_type = AV_PICTURE_TYPE_NONE; + if (avctx->codec_descriptor->props & AV_CODEC_PROP_INTRA_ONLY) { + p->intra_only_flag = AV_FRAME_FLAG_KEY; + if (avctx->codec_type == AVMEDIA_TYPE_VIDEO) + p->initial_pict_type = AV_PICTURE_TYPE_I; + } + + atomic_init(&p->state, STATE_INPUT_READY); + + p->parent = fctx; + p->avctx = child; + + ret = ff_pthread_init(p, per_thread_offsets); + if (ret < 0) + return ret; + + p->avpkt = av_packet_alloc(); + if (!p->avpkt) + return AVERROR(ENOMEM); + + atomic_init(&p->debug_threads, (child->debug & FF_DEBUG_THREADS) != 0); + + ret = AVERROR(pthread_create(&p->thread, NULL, frame_worker_thread, p)); + if (ret < 0) + return ret; + p->thread_started = 1; return 0; } @@ -923,7 +959,7 @@ int ff_frame_thread_init(AVCodecContext *avctx) int thread_count = avctx->thread_count; const FFCodec *codec = ffcodec(avctx->codec); FrameThreadContext *fctx; - int err, i = 0; + int err, decoders_to_free = 0; if (!thread_count) { int nb_cpus = av_cpu_count(); @@ -959,17 +995,35 @@ int ff_frame_thread_init(AVCodecContext *avctx) if (codec->p.type == AVMEDIA_TYPE_VIDEO) avctx->delay = avctx->thread_count - 1; + fctx->decoders = av_calloc(thread_count, sizeof(*fctx->decoders)); + if (!fctx->decoders) { + err = AVERROR(ENOMEM); + goto error; + } + + for (; decoders_to_free < thread_count; ) { + ChildDecoder *cd = &fctx->decoders[decoders_to_free]; + int first = !decoders_to_free; + + cd->parent = fctx; + + err = dec_ctx_init(cd, &decoders_to_free, avctx, codec, first); + if (err < 0) + goto error; + } + fctx->threads = av_calloc(thread_count, sizeof(*fctx->threads)); if (!fctx->threads) { err = AVERROR(ENOMEM); goto error; } - for (; i < thread_count; ) { - PerThreadContext *p = &fctx->threads[i]; - int first = !i; + for (int i = 0; i < thread_count; i++) { + PerThreadContext *p = &fctx->threads[i]; - err = init_thread(p, &i, fctx, avctx, codec, first); + fctx->decoders[i].thread = p; + + err = init_thread(p, fctx, avctx, fctx->decoders[i].ctx); if (err < 0) goto error; } @@ -977,13 +1031,12 @@ int ff_frame_thread_init(AVCodecContext *avctx) return 0; error: - ff_frame_thread_free(avctx, i); + ff_frame_thread_free(avctx, decoders_to_free); return err; } void ff_thread_flush(AVCodecContext *avctx) { - int i; FrameThreadContext *fctx = avctx->internal->thread_ctx; if (!fctx) return; @@ -1000,23 +1053,24 @@ void ff_thread_flush(AVCodecContext *avctx) decoded_frames_flush(&fctx->df); fctx->result = 0; - for (i = 0; i < avctx->thread_count; i++) { + for (int i = 0; i < avctx->thread_count; i++) { PerThreadContext *p = &fctx->threads[i]; decoded_frames_flush(&p->df); p->result = 0; - - avcodec_flush_buffers(p->avctx); } + + for (int i = 0; i < avctx->thread_count; i++) + avcodec_flush_buffers(fctx->decoders[i].ctx); } int ff_thread_can_start_frame(AVCodecContext *avctx) { if ((avctx->active_thread_type & FF_THREAD_FRAME) && ffcodec(avctx->codec)->update_thread_context) { - PerThreadContext *p = avctx->internal->thread_ctx; + ChildDecoder *cd = avctx->internal->thread_ctx; - if (atomic_load(&p->state) != STATE_SETTING_UP) + if (atomic_load(&cd->thread->state) != STATE_SETTING_UP) return 0; } @@ -1025,13 +1079,15 @@ int ff_thread_can_start_frame(AVCodecContext *avctx) static int thread_get_buffer_internal(AVCodecContext *avctx, AVFrame *f, int flags) { + ChildDecoder *cd; PerThreadContext *p; int err; if (!(avctx->active_thread_type & FF_THREAD_FRAME)) return ff_get_buffer(avctx, f, flags); - p = avctx->internal->thread_ctx; + cd = avctx->internal->thread_ctx; + p = cd->thread; if (atomic_load(&p->state) != STATE_SETTING_UP && ffcodec(avctx->codec)->update_thread_context) { av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n"); @@ -1085,18 +1141,18 @@ void ff_thread_release_ext_buffer(ThreadFrame *f) enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset) { - PerThreadContext *p; + ChildDecoder *cd; const void *ref; if (!avctx->internal->is_copy) return avctx->active_thread_type & FF_THREAD_FRAME ? FF_THREAD_IS_FIRST_THREAD : FF_THREAD_NO_FRAME_THREADING; - p = avctx->internal->thread_ctx; + cd = avctx->internal->thread_ctx; av_assert1(memcpy(&ref, (char*)avctx->priv_data + offset, sizeof(ref)) && ref == NULL); - memcpy(&ref, (const char*)p->parent->threads[0].avctx->priv_data + offset, sizeof(ref)); + memcpy(&ref, (const char*)cd->parent->decoders[0].ctx->priv_data + offset, sizeof(ref)); av_assert1(ref); ff_refstruct_replace((char*)avctx->priv_data + offset, ref); @@ -1105,7 +1161,8 @@ enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset) int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt) { - PerThreadContext *p = avctx->internal->thread_ctx; + ChildDecoder *cd = avctx->internal->thread_ctx; + PerThreadContext *p = cd->thread; if (!AVPACKET_IS_EMPTY(p->avpkt)) { av_packet_move_ref(pkt, p->avpkt); -- 2.43.0 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".