The queue needs to track each frame/packet's stream index, this is achieved by maintaining a parallel AVFifo instance for that purpose. This is simpler than implementing custom AVContainerFifo callbacks. --- fftools/ffmpeg_sched.c | 14 ++------- fftools/ffmpeg_utils.h | 10 ------ fftools/thread_queue.c | 71 ++++++++++++++++++++---------------------- fftools/thread_queue.h | 10 +++--- 4 files changed, 42 insertions(+), 63 deletions(-)
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 6a58661a5c..420d3f9ce9 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -375,7 +375,6 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si enum QueueType type) { ThreadQueue *tq; - ObjPool *op; if (queue_size <= 0) { if (type == QUEUE_FRAMES) @@ -393,18 +392,11 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE); } - op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() : - objpool_alloc_frames(); - if (!op) + tq = tq_alloc(nb_streams, queue_size, + (type == QUEUE_PACKETS) ? THREAD_QUEUE_PACKETS : THREAD_QUEUE_FRAMES); + if (!tq) return AVERROR(ENOMEM); - tq = tq_alloc(nb_streams, queue_size, op, - (type == QUEUE_PACKETS) ? pkt_move : frame_move); - if (!tq) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - *ptq = tq; return 0; } diff --git a/fftools/ffmpeg_utils.h b/fftools/ffmpeg_utils.h index 7939e44cdc..8ed6f81b28 100644 --- a/fftools/ffmpeg_utils.h +++ b/fftools/ffmpeg_utils.h @@ -44,14 +44,4 @@ static inline int err_merge(int err0, int err1) return (err0 < 0) ? err0 : FFMIN(err1, 0); } -static inline void pkt_move(void *dst, void *src) -{ - av_packet_move_ref(dst, src); -} - -static inline void frame_move(void *dst, void *src) -{ - av_frame_move_ref(dst, src); -} - #endif // FFTOOLS_FFMPEG_UTILS_H diff --git a/fftools/thread_queue.c b/fftools/thread_queue.c index fd73cc0a9b..b035ffe11d 100644 --- a/fftools/thread_queue.c +++ b/fftools/thread_queue.c @@ -20,13 +20,16 @@ #include <string.h> #include "libavutil/avassert.h" +#include "libavutil/container_fifo.h" #include "libavutil/error.h" #include "libavutil/fifo.h" +#include "libavutil/frame.h" #include "libavutil/intreadwrite.h" #include "libavutil/mem.h" #include "libavutil/thread.h" -#include "objpool.h" +#include "libavcodec/packet.h" + #include "thread_queue.h" enum { @@ -34,19 +37,14 @@ enum { FINISHED_RECV = (1 << 1), }; -typedef struct FifoElem { - void *obj; - unsigned int stream_idx; -} FifoElem; - struct ThreadQueue { int *finished; unsigned int nb_streams; - AVFifo *fifo; + enum ThreadQueueType type; - ObjPool *obj_pool; - void (*obj_move)(void *dst, void *src); + AVContainerFifo *fifo; + AVFifo *fifo_stream_index; pthread_mutex_t lock; pthread_cond_t cond; @@ -59,14 +57,8 @@ void tq_free(ThreadQueue **ptq) if (!tq) return; - if (tq->fifo) { - FifoElem elem; - while (av_fifo_read(tq->fifo, &elem, 1) >= 0) - objpool_release(tq->obj_pool, &elem.obj); - } - av_fifo_freep2(&tq->fifo); - - objpool_free(&tq->obj_pool); + av_container_fifo_free(&tq->fifo); + av_fifo_freep2(&tq->fifo_stream_index); av_freep(&tq->finished); @@ -77,7 +69,7 @@ void tq_free(ThreadQueue **ptq) } ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size, - ObjPool *obj_pool, void (*obj_move)(void *dst, void *src)) + enum ThreadQueueType type) { ThreadQueue *tq; int ret; @@ -104,12 +96,16 @@ ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size, goto fail; tq->nb_streams = nb_streams; - tq->fifo = av_fifo_alloc2(queue_size, sizeof(FifoElem), 0); + tq->type = type; + + tq->fifo = (type == THREAD_QUEUE_FRAMES) ? + av_container_fifo_alloc_avframe(0) : av_container_fifo_alloc_avpacket(0); if (!tq->fifo) goto fail; - tq->obj_pool = obj_pool; - tq->obj_move = obj_move; + tq->fifo_stream_index = av_fifo_alloc2(queue_size, sizeof(unsigned), 0); + if (!tq->fifo_stream_index) + goto fail; return tq; fail: @@ -132,23 +128,21 @@ int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data) goto finish; } - while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo)) + while (!(*finished & FINISHED_RECV) && !av_fifo_can_write(tq->fifo_stream_index)) pthread_cond_wait(&tq->cond, &tq->lock); if (*finished & FINISHED_RECV) { ret = AVERROR_EOF; *finished |= FINISHED_SEND; } else { - FifoElem elem = { .stream_idx = stream_idx }; - - ret = objpool_get(tq->obj_pool, &elem.obj); + ret = av_fifo_write(tq->fifo_stream_index, &stream_idx, 1); if (ret < 0) goto finish; - tq->obj_move(elem.obj, data); + ret = av_container_fifo_write(tq->fifo, data, 0); + if (ret < 0) + goto finish; - ret = av_fifo_write(tq->fifo, &elem, 1); - av_assert0(ret >= 0); pthread_cond_broadcast(&tq->cond); } @@ -161,18 +155,21 @@ finish: static int receive_locked(ThreadQueue *tq, int *stream_idx, void *data) { - FifoElem elem; unsigned int nb_finished = 0; - while (av_fifo_read(tq->fifo, &elem, 1) >= 0) { - if (tq->finished[elem.stream_idx] & FINISHED_RECV) { - objpool_release(tq->obj_pool, &elem.obj); + while (av_container_fifo_read(tq->fifo, data, 0) >= 0) { + unsigned idx; + int ret; + + ret = av_fifo_read(tq->fifo_stream_index, &idx, 1); + av_assert0(ret >= 0); + if (tq->finished[idx] & FINISHED_RECV) { + (tq->type == THREAD_QUEUE_FRAMES) ? + av_frame_unref(data) : av_packet_unref(data); continue; } - tq->obj_move(data, elem.obj); - objpool_release(tq->obj_pool, &elem.obj); - *stream_idx = elem.stream_idx; + *stream_idx = idx; return 0; } @@ -202,12 +199,12 @@ int tq_receive(ThreadQueue *tq, int *stream_idx, void *data) pthread_mutex_lock(&tq->lock); while (1) { - size_t can_read = av_fifo_can_read(tq->fifo); + size_t can_read = av_container_fifo_can_read(tq->fifo); ret = receive_locked(tq, stream_idx, data); // signal other threads if the fifo state changed - if (can_read != av_fifo_can_read(tq->fifo)) + if (can_read != av_container_fifo_can_read(tq->fifo)) pthread_cond_broadcast(&tq->cond); if (ret == AVERROR(EAGAIN)) { diff --git a/fftools/thread_queue.h b/fftools/thread_queue.h index 0cc8c71ebd..cc01c8a2c9 100644 --- a/fftools/thread_queue.h +++ b/fftools/thread_queue.h @@ -21,7 +21,10 @@ #include <string.h> -#include "objpool.h" +enum ThreadQueueType { + THREAD_QUEUE_FRAMES, + THREAD_QUEUE_PACKETS, +}; typedef struct ThreadQueue ThreadQueue; @@ -32,12 +35,9 @@ typedef struct ThreadQueue ThreadQueue; * maintained * @param queue_size number of items that can be stored in the queue without * blocking - * @param obj_pool object pool that will be used to allocate items stored in the - * queue; the pool becomes owned by the queue - * @param callback that moves the contents between two data pointers */ ThreadQueue *tq_alloc(unsigned int nb_streams, size_t queue_size, - ObjPool *obj_pool, void (*obj_move)(void *dst, void *src)); + enum ThreadQueueType type); void tq_free(ThreadQueue **tq); /** -- 2.43.0 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".