+ return ret;
+}
+
+static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
+{
+ int ret;
+ if (avctx->active_thread_type & FF_THREAD_FRAME)
+ ret = ff_thread_receive_frame(avctx, frame);
+ else
+ ret = ff_decode_receive_frame_internal(avctx, frame);
+
if (!ret) {
frame->best_effort_timestamp = guess_correct_pts(avctx,
frame->pts,
diff --git a/libavcodec/decode.h b/libavcodec/decode.h
index 906122b4a7..7ba8e3a332 100644
--- a/libavcodec/decode.h
+++ b/libavcodec/decode.h
@@ -58,6 +58,13 @@ typedef struct FrameDecodeData {
*/
int ff_decode_receive_frame(AVCodecContext *avctx, AVFrame *frame);
+/**
+ * Do the actual decoding and obtain a decoded frame from the decoder, if
+ * available. When frame threading is used, this is invoked by the worker
+ * threads, otherwise by the top layer directly.
+ */
+int ff_decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame);
+
/**
* Called by decoders to get the next packet for decoding.
*
diff --git a/libavcodec/internal.h b/libavcodec/internal.h
index a283c52e01..c87036efc4 100644
--- a/libavcodec/internal.h
+++ b/libavcodec/internal.h
@@ -56,6 +56,13 @@ typedef struct AVCodecInternal {
*/
int is_copy;
+ /**
+ * This field is set to 1 when frame threading is being used and the parent
+ * AVCodecContext of this AVCodecInternal is a worker-thread context (i.e.
+ * one of those actually doing the decoding), 0 otherwise.
+ */
+ int is_frame_mt;
+
/**
* An audio frame with less than required samples has been submitted (and
* potentially padded with silence). Reject all subsequent frames.
diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index 62a0b18a8a..d8182cb4b8 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -46,6 +46,7 @@
#include "libavutil/log.h"
#include "libavutil/mem.h"
#include "libavutil/opt.h"
+#include "libavutil/fifo.h"
#include "libavutil/thread.h"
enum {
@@ -73,6 +74,12 @@ enum {
INITIALIZED, ///< Thread has been properly set up
};
+typedef struct DecodedFrames {
+ AVFrame **f;
+ size_t nb_f;
+ size_t nb_f_allocated;
+} DecodedFrames;
+
/**
* Context used by codec threads and stored in their AVCodecInternal
thread_ctx.
*/
@@ -93,8 +100,10 @@ typedef struct PerThreadContext {
AVPacket *avpkt; ///< Input packet (for decoding) or output (for encoding).
- AVFrame *frame; ///< Output frame (for decoding) or input (for encoding).
- int got_frame; ///< The output of got_picture_ptr from
the last avcodec_decode_video() call.
+ /**
+ * Decoded frames from a single decode iteration.
+ */
+ DecodedFrames df;
int result; ///< The result of the last codec
decode/encode() call.
atomic_int state;
@@ -141,6 +150,14 @@ typedef struct FrameThreadContext {
pthread_cond_t async_cond;
int async_lock;
+ DecodedFrames df;
+ int result;
+
+ /**
+ * Packet to be submitted to the next thread for decoding.
+ */
+ AVPacket *next_pkt;
+
int next_decoding; ///< The next context to submit a packet
to.
int next_finished; ///< The next context to return output
from.
@@ -190,6 +207,51 @@ static void thread_set_name(PerThreadContext *p)
ff_thread_setname(name);
}
+// get a free frame to decode into
+static AVFrame *decoded_frames_get_free(DecodedFrames *df)
+{
+ if (df->nb_f == df->nb_f_allocated) {
+ AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1,
+ sizeof(*df->f));
+ if (!tmp)
+ return NULL;
+ df->f = tmp;
+
+ df->f[df->nb_f] = av_frame_alloc();
+ if (!df->f[df->nb_f])
+ return NULL;
+
+ df->nb_f_allocated++;
+ }
+
+ av_frame_unref(df->f[df->nb_f]);
+ av_frame_unref(df->f[i]);
+ df->nb_f = 0;
+}
+
+static void decoded_frames_free(DecodedFrames *df)
+{
+ for (int i = 0; i < df->nb_f_allocated; i++)
+ av_frame_free(&df->f[i]);
+ av_freep(&df->f);
+ df->nb_f = 0;
+ df->nb_f_allocated = 0;
+}
+
/**
* Codec worker thread.
*
@@ -202,6 +264,7 @@ static attribute_align_arg void *frame_worker_thread(void
*arg)
PerThreadContext *p = arg;
AVCodecContext *avctx = p->avctx;
const FFCodec *codec = ffcodec(avctx->codec);
+ int ret;
thread_set_name(p);
@@ -236,16 +299,31 @@ FF_ENABLE_DEPRECATION_WARNINGS
p->hwaccel_serializing = 1;
}
- av_frame_unref(p->frame);
- p->got_frame = 0;
- p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt);
+ ret = 0;
+ while (ret >= 0) {
+ AVFrame *frame;
- if ((p->result < 0 || !p->got_frame) && p->frame->buf[0])
- ff_thread_release_buffer(avctx, p->frame);
+ /* get the frame which will store the output */
+ frame = decoded_frames_get_free(&p->df);
+ if (!frame) {
+ p->result = AVERROR(ENOMEM);
+ goto alloc_fail;
+ }
+
+ /* do the actual decoding */
+ ret = ff_decode_receive_frame_internal(avctx, frame);
+ if (ret == 0)
+ p->df.nb_f++;
+ else if (ret < 0 && frame->buf[0])
+ ff_thread_release_buffer(avctx, frame);
+
+ p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret;
+ }
if (atomic_load(&p->state) == STATE_SETTING_UP)
ff_thread_finish_setup(avctx);
+alloc_fail:
if (p->hwaccel_serializing) {
/* wipe hwaccel state to avoid stale pointers lying around;
* the state was transferred to FrameThreadContext in
@@ -441,23 +519,25 @@ static void release_delayed_buffers(PerThreadContext *p)
#endif
static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
- AVPacket *avpkt)
+ AVPacket *in_pkt)
{
FrameThreadContext *fctx = p->parent;
PerThreadContext *prev_thread = fctx->prev_thread;
- const AVCodec *codec = p->avctx->codec;
- int ret;
-
- if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY))
- return 0;
+ int err;
pthread_mutex_lock(&p->mutex);
- ret = update_context_from_user(p->avctx, user_avctx);
- if (ret) {
+ av_packet_unref(p->avpkt);
+ av_packet_move_ref(p->avpkt, in_pkt);
+ if (!p->avpkt->size)
+ p->avctx->internal->draining = 1;
+
+ err = update_context_from_user(p->avctx, user_avctx);
+ if (err < 0) {
pthread_mutex_unlock(&p->mutex);
- return ret;
+ return err;
}
+
atomic_store_explicit(&p->debug_threads,
(p->avctx->debug & FF_DEBUG_THREADS) != 0,
memory_order_relaxed);
@@ -467,7 +547,6 @@ static int submit_packet(PerThreadContext *p,
AVCodecContext *user_avctx,
#endif
if (prev_thread) {
- int err;
if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
pthread_mutex_lock(&prev_thread->progress_mutex);
while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
@@ -475,10 +554,16 @@ static int submit_packet(PerThreadContext *p,
AVCodecContext *user_avctx,
pthread_mutex_unlock(&prev_thread->progress_mutex);
}
- err = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
- if (err) {
- pthread_mutex_unlock(&p->mutex);
- return err;
+ /* codecs without delay might not be prepared to be called repeatedly
here during
+ * flushing (vp3/theora), and also don't need to be, since from this
point on, they
+ * will always return EOF anyway */
+ if (!p->avctx->internal->draining || (p->avctx->codec->capabilities &
AV_CODEC_CAP_DELAY))
+ {
+ err = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
+ if (err) {
+ pthread_mutex_unlock(&p->mutex);
+ return err;
+ }
}
}
@@ -488,14 +573,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
FFSWAP(void*, p->avctx->hwaccel_context,
fctx->stash_hwaccel_context);
FFSWAP(void*, p->avctx->internal->hwaccel_priv_data,
fctx->stash_hwaccel_priv);
- av_packet_unref(p->avpkt);
- ret = av_packet_ref(p->avpkt, avpkt);
- if (ret < 0) {
- pthread_mutex_unlock(&p->mutex);
- av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in
submit_packet()\n");
- return ret;
- }
-
atomic_store(&p->state, STATE_SETTING_UP);
pthread_cond_signal(&p->input_cond);
pthread_mutex_unlock(&p->mutex);
@@ -539,57 +616,42 @@ FF_ENABLE_DEPRECATION_WARNINGS
#endif
fctx->prev_thread = p;
- fctx->next_decoding++;
+ fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count;
return 0;
}
-int ff_thread_decode_frame(AVCodecContext *avctx,
- AVFrame *picture, int *got_picture_ptr,
- AVPacket *avpkt)
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
{
FrameThreadContext *fctx = avctx->internal->thread_ctx;
- int finished = fctx->next_finished;
- PerThreadContext *p;
- int err;
+ int ret = 0;
/* release the async lock, permitting blocked hwaccel threads to
* go forward while we are in this function */
async_unlock(fctx);
- /*
- * Submit a packet to the next decoding thread.
- */
+ /* submit packets to threads while there are no buffered results to return
*/
+ while (!fctx->df.nb_f && !fctx->result) {
+ PerThreadContext *p;
- p = &fctx->threads[fctx->next_decoding];
- err = submit_packet(p, avctx, avpkt);
- if (err)
- goto finish;
-
- /*
- * If we're still receiving the initial packets, don't return a frame.
- */
-
- if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id ==
AV_CODEC_ID_FFV1)))
- fctx->delaying = 0;
+ /* get a packet to be submitted to the next thread */
+ av_packet_unref(fctx->next_pkt);
+ ret = ff_decode_get_packet(avctx, fctx->next_pkt);
+ if (ret < 0 && ret != AVERROR_EOF)
+ goto finish;
- if (fctx->delaying) {
- *got_picture_ptr=0;
- if (avpkt->size) {
- err = avpkt->size;
+ ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
+ fctx->next_pkt);
+ if (ret < 0)
goto finish;
- }
- }
- /*
- * Return the next available frame from the oldest thread.
- * If we're at the end of the stream, then we have to skip threads that
- * didn't output a frame/error, because we don't want to accidentally
signal
- * EOF (avpkt->size == 0 && *got_picture_ptr == 0 && err >= 0).
- */
+ /* do not return any frames until all threads have something to do */
+ if (fctx->next_decoding != fctx->next_finished &&
+ !avctx->internal->draining)
+ continue;
- do {
- p = &fctx->threads[finished++];
+ p = &fctx->threads[fctx->next_finished];
+ fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
if (atomic_load(&p->state) != STATE_INPUT_READY) {
pthread_mutex_lock(&p->progress_mutex);
@@ -598,35 +660,28 @@ int ff_thread_decode_frame(AVCodecContext *avctx,
pthread_mutex_unlock(&p->progress_mutex);
}
- av_frame_move_ref(picture, p->frame);
- *got_picture_ptr = p->got_frame;
- picture->pkt_dts = p->avpkt->dts;
- err = p->result;
-
- /*
- * A later call with avkpt->size == 0 may loop over all threads,
- * including this one, searching for a frame/error to return before
being
- * stopped by the "finished != fctx->next_finished" condition.
- * Make sure we don't mistakenly return the same frame/error again.
- */
- p->got_frame = 0;
- p->result = 0;
-
- if (finished >= avctx->thread_count) finished = 0;
- } while (!avpkt->size && !*got_picture_ptr && err >= 0 && finished !=
fctx->next_finished);
+ update_context_from_thread(avctx, p->avctx, 1);
- update_context_from_thread(avctx, p->avctx, 1);
+ fctx->result = p->result;
+ p->result = 0;
- if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0;
+ if (p->df.nb_f)
+ FFSWAP(DecodedFrames, fctx->df, p->df);
+ }
- fctx->next_finished = finished;
+ /* a thread may return multiple frames AND an error
+ * we first return all the frames, then the error */
+ if (fctx->df.nb_f) {
+ decoded_frames_pop(&fctx->df, frame);
+ ret = 0;
+ } else {
+ ret = fctx->result;
+ fctx->result = 0;
+ }
- /* return the size of the consumed packet if no error occurred */
- if (err >= 0)
- err = avpkt->size;
finish:
async_lock(fctx);
- return err;
+ return ret;
}
void ff_thread_report_progress(ThreadFrame *f, int n, int field)
@@ -726,7 +781,6 @@ static void park_frame_worker_threads(FrameThreadContext
*fctx, int thread_count
pthread_cond_wait(&p->output_cond, &p->progress_mutex);
pthread_mutex_unlock(&p->progress_mutex);
}
- p->got_frame = 0;
}
async_lock(fctx);
@@ -784,11 +838,12 @@ void ff_frame_thread_free(AVCodecContext *avctx, int
thread_count)
av_buffer_unref(&ctx->internal->pool);
av_packet_free(&ctx->internal->last_pkt_props);
+ av_packet_free(&ctx->internal->in_pkt);
av_freep(&ctx->internal);
av_buffer_unref(&ctx->hw_frames_ctx);
}
- av_frame_free(&p->frame);
+ decoded_frames_free(&p->df);
ff_pthread_free(p, per_thread_offsets);
av_packet_free(&p->avpkt);
@@ -796,6 +851,9 @@ void ff_frame_thread_free(AVCodecContext *avctx, int
thread_count)
av_freep(&p->avctx);
}
+ decoded_frames_free(&fctx->df);
+ av_packet_free(&fctx->next_pkt);
+
av_freep(&fctx->threads);
ff_pthread_free(fctx, thread_ctx_offsets);
@@ -854,13 +912,17 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
if (err < 0)
return err;
- if (!(p->frame = av_frame_alloc()) ||
- !(p->avpkt = av_packet_alloc()))
+ if (!(p->avpkt = av_packet_alloc()))
return AVERROR(ENOMEM);
+ copy->internal->is_frame_mt = 1;
if (!first)
copy->internal->is_copy = 1;
+ copy->internal->in_pkt = av_packet_alloc();
+ if (!copy->internal->in_pkt)
+ return AVERROR(ENOMEM);
+
copy->internal->last_pkt_props = av_packet_alloc();
if (!copy->internal->last_pkt_props)
return AVERROR(ENOMEM);
@@ -920,6 +982,10 @@ int ff_frame_thread_init(AVCodecContext *avctx)
return err;
}
+ fctx->next_pkt = av_packet_alloc();
+ if (!fctx->next_pkt)
+ return AVERROR(ENOMEM);
+
fctx->async_lock = 1;
fctx->delaying = 1;
@@ -964,17 +1030,28 @@ void ff_thread_flush(AVCodecContext *avctx)
fctx->next_decoding = fctx->next_finished = 0;
fctx->delaying = 1;
fctx->prev_thread = NULL;
+
+ decoded_frames_flush(&fctx->df);
+
for (i = 0; i < avctx->thread_count; i++) {
PerThreadContext *p = &fctx->threads[i];
- // Make sure decode flush calls with size=0 won't return old frames
- p->got_frame = 0;
- av_frame_unref(p->frame);
- p->result = 0;
+
+ decoded_frames_flush(&p->df);
#if FF_API_THREAD_SAFE_CALLBACKS
release_delayed_buffers(p);
#endif
+ av_packet_unref(p->avctx->internal->last_pkt_props);
+ av_packet_unref(p->avctx->internal->in_pkt);
+
+ p->avctx->pts_correction_last_pts =
+ p->avctx->pts_correction_last_dts = INT64_MIN;
+
+ p->avctx->internal->draining = 0;
+ p->avctx->internal->draining_done = 0;
+ p->avctx->internal->nb_draining_errors = 0;
+
if (ffcodec(avctx->codec)->flush)
ffcodec(avctx->codec)->flush(p->avctx);
}
@@ -1193,3 +1270,15 @@ void ff_thread_release_ext_buffer(AVCodecContext *avctx,
ThreadFrame *f)
f->owner[0] = f->owner[1] = NULL;
ff_thread_release_buffer(avctx, f->f);
}
+
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt)
+{
+ PerThreadContext *p = avctx->internal->thread_ctx;
+
+ if (p->avpkt->buf) {
+ av_packet_move_ref(pkt, p->avpkt);
+ return 0;
+ }
+
+ return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN);
+}
diff --git a/libavcodec/thread.h b/libavcodec/thread.h
index d5673f25ea..7ae69990fb 100644
--- a/libavcodec/thread.h
+++ b/libavcodec/thread.h
@@ -40,17 +40,12 @@
void ff_thread_flush(AVCodecContext *avctx);
/**
- * Submit a new frame to a decoding thread.
- * Returns the next available frame in picture. *got_picture_ptr
- * will be 0 if none is available.
- * The return value on success is the size of the consumed packet for
- * compatibility with FFCodec.decode. This means the decoder
- * has to consume the full packet.
+ * Submit available packets for decoding to worker threads, return a
+ * decoded frame if available. Returns AVERROR(EAGAIN) if none is available.
*
- * Parameters are the same as FFCodec.decode.
+ * Parameters are the same as FFCodec.receive_frame.
*/
-int ff_thread_decode_frame(AVCodecContext *avctx, AVFrame *picture,
- int *got_picture_ptr, AVPacket *avpkt);
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame);
/**
* If the codec defines update_thread_context(), call this
@@ -99,6 +94,11 @@ int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f,
int flags);
*/
void ff_thread_release_buffer(AVCodecContext *avctx, AVFrame *f);
+/**
+ * Get a packet for decoding. This gets invoked by the worker threads.
+ */
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt);
+
int ff_thread_init(AVCodecContext *s);
int ff_slice_thread_execute_with_mainfunc(AVCodecContext *avctx,
int (*action_func2)(AVCodecContext *c, void *arg, int jobnr, int
threadnr),