From: Jan Sebechlebsky <sebechlebsky...@gmail.com> Add support for nonblocking calls.
Signed-off-by: Jan Sebechlebsky <sebechlebsky...@gmail.com> --- libavformat/fifo.c | 70 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/libavformat/fifo.c b/libavformat/fifo.c index bc8c973..efd91e3 100644 --- a/libavformat/fifo.c +++ b/libavformat/fifo.c @@ -25,6 +25,7 @@ #include "avformat.h" #include "internal.h" #include "pthread.h" +#include "url.h" #define FIFO_DEFAULT_QUEUE_SIZE 60 #define FIFO_DEFAULT_MAX_RECOVERY_ATTEMPTS 16 @@ -77,6 +78,13 @@ typedef struct FifoContext { * from failure or queue overflow */ uint8_t restart_with_keyframe; + /* Whether termination was requested by invoking deinit + * before the thread was finished. Used only in non-blocking + * mode - when AVFMT_FLAG_NONBLOCK is set. */ + volatile uint8_t termination_requested; + + /* Original interrupt callback of the underlying muxer. */ + AVIOInterruptCB orig_interrupt_callback; } FifoContext; typedef struct FifoThreadContext { @@ -110,6 +118,16 @@ typedef struct FifoMessage { AVPacket pkt; } FifoMessage; +static int fifo_interrupt_callback_wrapper(void *arg) +{ + FifoContext *ctx = arg; + + if (ctx->termination_requested) + return 1; + + return ff_check_interrupt(&ctx->orig_interrupt_callback); +} + static int fifo_thread_write_header(FifoThreadContext *ctx) { AVFormatContext *avf = ctx->avf; @@ -448,6 +466,8 @@ static void *fifo_consumer_thread(void *data) static int fifo_mux_init(AVFormatContext *avf) { FifoContext *fifo = avf->priv_data; + AVIOInterruptCB interrupt_cb = {.callback = fifo_interrupt_callback_wrapper, + .opaque = fifo}; AVFormatContext *avf2; int ret = 0, i; @@ -458,7 +478,8 @@ static int fifo_mux_init(AVFormatContext *avf) fifo->avf = avf2; - avf2->interrupt_callback = avf->interrupt_callback; + fifo->orig_interrupt_callback = avf->interrupt_callback; + avf2->interrupt_callback = interrupt_cb; avf2->max_delay = avf->max_delay; ret = av_dict_copy(&avf2->metadata, avf->metadata, 0); if (ret < 0) @@ -543,7 +564,7 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) { FifoContext *fifo = avf->priv_data; FifoMessage msg = {.type = pkt ? FIFO_WRITE_PACKET : FIFO_FLUSH_OUTPUT}; - int ret; + int ret, queue_flags = 0; if (pkt) { av_init_packet(&msg.pkt); @@ -552,15 +573,21 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt) return ret; } - ret = av_thread_message_queue_send(fifo->queue, &msg, - fifo->drop_pkts_on_overflow ? - AV_THREAD_MESSAGE_NONBLOCK : 0); + if (fifo->drop_pkts_on_overflow || (avf->flags & AVFMT_FLAG_NONBLOCK)) + queue_flags |= AVFMT_FLAG_NONBLOCK; + + ret = av_thread_message_queue_send(fifo->queue, &msg, queue_flags); + if (ret == AVERROR(EAGAIN)) { - uint8_t overflow_set = 0; + uint8_t overflow_set; + + if (avf->flags & AVFMT_FLAG_NONBLOCK) + return ret; /* Queue is full, set fifo->overflow_flag to 1 * to let consumer thread know the queue should * be flushed. */ + overflow_set = 0; pthread_mutex_lock(&fifo->overflow_flag_lock); if (!fifo->overflow_flag) fifo->overflow_flag = overflow_set = 1; @@ -588,11 +615,22 @@ static int fifo_write_trailer(AVFormatContext *avf) av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); - ret = pthread_join( fifo->writer_thread, NULL); - if (ret < 0) { - av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", - av_err2str(AVERROR(ret))); - return AVERROR(ret); + if (!(avf->flags & AVFMT_FLAG_NONBLOCK)) { + ret = pthread_join( fifo->writer_thread, NULL); + if (ret < 0) { + av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", + av_err2str(AVERROR(ret))); + return AVERROR(ret); + } + } else { + ret = pthread_tryjoin_np( fifo->writer_thread, NULL); + if (ret == EBUSY) + return AVERROR(EAGAIN); + if (ret) { + av_log(avf, AV_LOG_ERROR, "pthread_tryjoin_np error: %s\n", + av_err2str(AVERROR(ret))); + return AVERROR(ret); + } } ret = fifo->write_trailer_ret; @@ -603,6 +641,16 @@ static void fifo_deinit(AVFormatContext *avf) { FifoContext *fifo = avf->priv_data; + if (avf->flags & AVFMT_FLAG_NONBLOCK) { + int ret; + fifo->termination_requested = 1; + ret = pthread_join( fifo->writer_thread, NULL); + if (ret < 0) { + av_log(avf, AV_LOG_ERROR, "pthread join error: %s\n", + av_err2str(AVERROR(ret))); + } + } + if (fifo->format_options) av_dict_free(&fifo->format_options); -- 1.9.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel