Andreas Rheinhardt: > Up until now, when doing frame thread encoding, each worker thread > tried to allocate an AVPacket for every AVFrame to be encoded; said > packets would then be handed back to the main thread, where the content > of said packet is copied into the packet actually destined for output; > the temporary AVPacket is then freed. > > Besides being wasteful this also has another problem: There is a risk of > deadlock, namely if no AVPacket can be allocated at all. The user > doesn't get an error at all in this case and the worker threads will > simply try to allocate a packet again and again. If the user has > supplied enough frames, the user's thread will block until a task has > been completed, which just doesn't happen if no packet can ever be > allocated. > > This patch instead modifies the code to allocate the packets during > init; they are then reused again and again. > > Signed-off-by: Andreas Rheinhardt <andreas.rheinha...@gmail.com> > --- > libavcodec/frame_thread_encoder.c | 61 +++++++++++++++++++------------ > 1 file changed, 37 insertions(+), 24 deletions(-) > > diff --git a/libavcodec/frame_thread_encoder.c > b/libavcodec/frame_thread_encoder.c > index 9ca34e7ffb..bcd3c94f8b 100644 > --- a/libavcodec/frame_thread_encoder.c > +++ b/libavcodec/frame_thread_encoder.c > @@ -32,13 +32,18 @@ > #include "thread.h" > > #define MAX_THREADS 64 > -#define BUFFER_SIZE (2*MAX_THREADS) > +/* There can be as many as MAX_THREADS + 1 outstanding tasks. > + * An additional + 1 is needed so that one can distinguish > + * the case of zero and MAX_THREADS + 1 outstanding tasks modulo > + * the number of buffers. */ > +#define BUFFER_SIZE (MAX_THREADS + 2) > > typedef struct{ > AVFrame *indata; > AVPacket *outdata; > int64_t return_code; > unsigned index; > + int finished; > } Task; > > typedef struct{ > @@ -49,8 +54,9 @@ typedef struct{ > pthread_mutex_t task_fifo_mutex; > pthread_cond_t task_fifo_cond; > > - Task finished_tasks[BUFFER_SIZE]; > - pthread_mutex_t finished_task_mutex; > + unsigned max_tasks; > + Task tasks[BUFFER_SIZE]; > + pthread_mutex_t finished_task_mutex; /* Guards tasks[i].finished */ > pthread_cond_t finished_task_cond; > > unsigned task_index; > @@ -63,17 +69,13 @@ typedef struct{ > static void * attribute_align_arg worker(void *v){ > AVCodecContext *avctx = v; > ThreadContext *c = avctx->internal->frame_thread_encoder; > - AVPacket *pkt = NULL; > > while (!atomic_load(&c->exit)) { > int got_packet = 0, ret; > + AVPacket *pkt; > AVFrame *frame; > Task task; > > - if(!pkt) pkt = av_packet_alloc(); > - if(!pkt) continue; > - av_init_packet(pkt); > - > pthread_mutex_lock(&c->task_fifo_mutex); > while (av_fifo_size(c->task_fifo) <= 0 || atomic_load(&c->exit)) { > if (atomic_load(&c->exit)) { > @@ -84,7 +86,12 @@ static void * attribute_align_arg worker(void *v){ > } > av_fifo_generic_read(c->task_fifo, &task, sizeof(task), NULL); > pthread_mutex_unlock(&c->task_fifo_mutex); > + /* The main thread ensures that any two outstanding tasks have > + * different indices, ergo each worker thread owns its element > + * of c->tasks with the exception of finished, which is shared > + * with the main thread and guarded by finished_task_mutex. */ > frame = task.indata; > + pkt = c->tasks[task.index].outdata; > > ret = avctx->codec->encode2(avctx, pkt, frame, &got_packet); > if(got_packet) { > @@ -101,13 +108,12 @@ static void * attribute_align_arg worker(void *v){ > pthread_mutex_unlock(&c->buffer_mutex); > av_frame_free(&frame); > pthread_mutex_lock(&c->finished_task_mutex); > - c->finished_tasks[task.index].outdata = pkt; pkt = NULL; > - c->finished_tasks[task.index].return_code = ret; > + c->tasks[task.index].return_code = ret; > + c->tasks[task.index].finished = 1; > pthread_cond_signal(&c->finished_task_cond); > pthread_mutex_unlock(&c->finished_task_mutex); > } > end: > - av_free(pkt); > pthread_mutex_lock(&c->buffer_mutex); > avcodec_close(avctx); > pthread_mutex_unlock(&c->buffer_mutex); > @@ -194,6 +200,12 @@ int ff_frame_thread_encoder_init(AVCodecContext *avctx, > AVDictionary *options){ > pthread_cond_init(&c->finished_task_cond, NULL); > atomic_init(&c->exit, 0); > > + c->max_tasks = avctx->thread_count + 2; > + for (unsigned i = 0; i < c->max_tasks; i++) { > + if (!(c->tasks[i].outdata = av_packet_alloc())) > + goto fail; > + } > + > for(i=0; i<avctx->thread_count ; i++){ > AVDictionary *tmp = NULL; > int ret; > @@ -261,8 +273,8 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){ > av_frame_free(&task.indata); > } > > - for (i=0; i<BUFFER_SIZE; i++) { > - av_packet_free(&c->finished_tasks[i].outdata); > + for (unsigned i = 0; i < c->max_tasks; i++) { > + av_packet_free(&c->tasks[i].outdata); > } > > pthread_mutex_destroy(&c->task_fifo_mutex); > @@ -276,7 +288,7 @@ void ff_frame_thread_encoder_free(AVCodecContext *avctx){ > > int ff_thread_video_encode_frame(AVCodecContext *avctx, AVPacket *pkt, const > AVFrame *frame, int *got_packet_ptr){ > ThreadContext *c = avctx->internal->frame_thread_encoder; > - Task task; > + Task *outtask, task; > int ret; > > av_assert1(!*got_packet_ptr); > @@ -298,27 +310,28 @@ int ff_thread_video_encode_frame(AVCodecContext *avctx, > AVPacket *pkt, const AVF > pthread_cond_signal(&c->task_fifo_cond); > pthread_mutex_unlock(&c->task_fifo_mutex); > > - c->task_index = (c->task_index+1) % BUFFER_SIZE; > + c->task_index = (c->task_index + 1) % c->max_tasks; > } > > + outtask = &c->tasks[c->finished_task_index]; > pthread_mutex_lock(&c->finished_task_mutex); > if (c->task_index == c->finished_task_index || > - (frame && !c->finished_tasks[c->finished_task_index].outdata && > - (c->task_index - c->finished_task_index) % BUFFER_SIZE <= > avctx->thread_count)) { > + (frame && !outtask->finished && > + (c->task_index - c->finished_task_index + c->max_tasks) % > c->max_tasks <= avctx->thread_count)) { > pthread_mutex_unlock(&c->finished_task_mutex); > return 0; > } > - > - while (!c->finished_tasks[c->finished_task_index].outdata) { > + while (!outtask->finished) { > pthread_cond_wait(&c->finished_task_cond, &c->finished_task_mutex); > } > - task = c->finished_tasks[c->finished_task_index]; > - *pkt = *(AVPacket*)(task.outdata); > + /* We now own outtask completely: No worker thread touches it any more, > + * because there is no outstanding task with this index. */ > + outtask->finished = 0; > + av_packet_move_ref(pkt, outtask->outdata); > if(pkt->data) > *got_packet_ptr = 1; > - av_freep(&c->finished_tasks[c->finished_task_index].outdata); > - c->finished_task_index = (c->finished_task_index+1) % BUFFER_SIZE; > + c->finished_task_index = (c->finished_task_index + 1) % c->max_tasks; > pthread_mutex_unlock(&c->finished_task_mutex); > > - return task.return_code; > + return outtask->return_code; > } > Will apply this patchset tomorrow unless there are objections. Thanks to Paul for looking at some of the patches.
- Andreas _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".