libRIST internally stores packets in a fifo of 1024 packets, overwriting old packets when not read in a sufficient pace. Unfortunately this results in many fifo overflow errors when ffmpeg consumes a libRIST stream. This patch creates a receiver thread based on the UDP circular buffer code.
Signed-off-by: Gijs Peskens <g...@peskens.net> --- libavformat/librist.c | 201 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 196 insertions(+), 5 deletions(-) diff --git a/libavformat/librist.c b/libavformat/librist.c index b120346f48..47c01a8432 100644 --- a/libavformat/librist.c +++ b/libavformat/librist.c @@ -26,6 +26,8 @@ #include "libavutil/opt.h" #include "libavutil/parseutils.h" #include "libavutil/time.h" +#include "libavutil/fifo.h" +#include "libavutil/intreadwrite.h" #include "avformat.h" #include "internal.h" @@ -33,6 +35,15 @@ #include "os_support.h" #include "url.h" +#if HAVE_W32THREADS +#undef HAVE_PTHREAD_CANCEL +#define HAVE_PTHREAD_CANCEL 1 +#endif + +#if HAVE_PTHREAD_CANCEL +#include "libavutil/thread.h" +#endif + #include <librist/librist.h> #include <librist/version.h> // RIST_MAX_PACKET_SIZE - 28 minimum protocol overhead @@ -67,6 +78,19 @@ typedef struct RISTContext { struct rist_peer *peer; struct rist_ctx *ctx; + + int circular_buffer_size; + AVFifoBuffer *fifo; + int circular_buffer_error; + int overrun_nonfatal; + +#if HAVE_PTHREAD_CANCEL + pthread_t receiver_thread; + pthread_mutex_t mutex; + pthread_cond_t cond; + int thread_started; + int thread_stop; +#endif } RISTContext; #define D AV_OPT_FLAG_DECODING_PARAM @@ -82,6 +106,8 @@ static const AVOption librist_options[] = { { "log_level", "set loglevel", OFFSET(log_level), AV_OPT_TYPE_INT, {.i64=RIST_LOG_INFO}, -1, INT_MAX, .flags = D|E }, { "secret", "set encryption secret",OFFSET(secret), AV_OPT_TYPE_STRING,{.str=NULL}, 0, 0, .flags = D|E }, { "encryption","set encryption type",OFFSET(encryption), AV_OPT_TYPE_INT ,{.i64=0}, 0, INT_MAX, .flags = D|E }, + { "fifo_size", "set the receiving circular buffer size, expressed as a number of packets with size of 188 bytes, 0 to disable", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D }, + { "overrun_nonfatal", "survive in case of receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, D }, { NULL } }; @@ -119,6 +145,15 @@ static int librist_close(URLContext *h) RISTContext *s = h->priv_data; int ret = 0; +#if HAVE_PTHREAD_CANCEL + if (s->thread_started) { + pthread_mutex_lock(&s->mutex); + s->thread_stop = 1; + pthread_mutex_unlock(&s->mutex); + pthread_join(s->receiver_thread, NULL); + } +#endif + av_fifo_freep(&s->fifo); s->peer = NULL; if (s->ctx) @@ -128,6 +163,78 @@ static int librist_close(URLContext *h) return risterr2ret(ret); } +static void *receiver_thread(void *_url_context) +{ + URLContext *h = _url_context; + RISTContext *s = h->priv_data; + int ret; + uint8_t tmp[4]; +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + const struct rist_data_block *data_block; +#else + struct rist_data_block *data_block; +#endif + + while (1) + { + pthread_mutex_lock(&s->mutex); + if (s->thread_stop) + break; + pthread_mutex_unlock(&s->mutex); +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME); +#else + ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME); +#endif + if (ret == 0) + continue; + + pthread_mutex_lock(&s->mutex); + if (ret < 0) { + s->circular_buffer_error = ret; + break; + } + + if (data_block->payload_len > MAX_PAYLOAD_SIZE) { +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + rist_receiver_data_block_free((struct rist_data_block**)&data_block); +#else + rist_receiver_data_block_free2(&data_block); +#endif + s->circular_buffer_error = AVERROR_EXTERNAL; + break; + } + AV_WL32(tmp, data_block->payload_len); + if (av_fifo_space(s->fifo) < (data_block->payload_len +4)) + { + /* No Space left */ + if (s->overrun_nonfatal) { + av_log(h, AV_LOG_WARNING, "Circular buffer overrun. " + "Surviving due to overrun_nonfatal option\n"); + continue; + } else { + av_log(h, AV_LOG_ERROR, "Circular buffer overrun. " + "To avoid, increase fifo_size URL option. " + "To survive in such case, use overrun_nonfatal option\n"); + s->circular_buffer_error = AVERROR(EIO); + break; + } + } + av_fifo_generic_write(s->fifo, tmp, 4, NULL); + av_fifo_generic_write(s->fifo, (void*)data_block->payload, data_block->payload_len, NULL); + pthread_mutex_unlock(&s->mutex); + pthread_cond_signal(&s->cond); +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + rist_receiver_data_block_free((struct rist_data_block**)&data_block); +#else + rist_receiver_data_block_free2(&data_block); +#endif + } + pthread_mutex_unlock(&s->mutex); + pthread_cond_signal(&s->cond); + return NULL; +} + static int librist_open(URLContext *h, const char *uri, int flags) { RISTContext *s = h->priv_data; @@ -194,27 +301,111 @@ static int librist_open(URLContext *h, const char *uri, int flags) if (ret < 0) goto err; + s->circular_buffer_size *= 188; + +#if HAVE_PTHREAD_CANCEL + //Create receiver thread if circular buffer size is set and we are receiving + if ((flags & AVIO_FLAG_READ) && s->circular_buffer_size > 0) { + /* start the task going */ + s->fifo = av_fifo_alloc(s->circular_buffer_size); + if (!s->fifo) { + ret = AVERROR(ENOMEM); + goto err; + } + ret = pthread_mutex_init(&s->mutex, NULL); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret)); + ret = AVERROR(ret); + goto err; + } + ret = pthread_cond_init(&s->cond, NULL); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret)); + ret = AVERROR(ret); + goto cond_fail; + } + ret = pthread_create(&s->receiver_thread, NULL, receiver_thread, h); + if (ret != 0) { + av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret)); + ret = AVERROR(ret); + goto thread_fail; + } + s->thread_started = 1; + } +#endif return 0; - +#if HAVE_PTHREAD_CANCEL + thread_fail: + pthread_cond_destroy(&s->cond); + cond_fail: + pthread_mutex_destroy(&s->mutex); +#endif err: librist_close(h); - + av_fifo_freep(&s->fifo); return risterr2ret(ret); } static int librist_read(URLContext *h, uint8_t *buf, int size) { RISTContext *s = h->priv_data; +#if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 + const struct rist_data_block *data_block; +#else + struct rist_data_block *data_block; +#endif int ret; +#if HAVE_PTHREAD_CANCEL + int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK; + + if (s->fifo) { + pthread_mutex_lock(&s->mutex); + do { + avail = av_fifo_size(s->fifo); + if (avail) { // >=size) { + uint8_t tmp[4]; + + av_fifo_generic_read(s->fifo, tmp, 4, NULL); + avail = AV_RL32(tmp); + if(avail > size){ + av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n"); + avail = size; + } + + av_fifo_generic_read(s->fifo, buf, avail, NULL); + av_fifo_drain(s->fifo, AV_RL32(tmp) - avail); + pthread_mutex_unlock(&s->mutex); + return avail; + } else if(s->circular_buffer_error){ + int err = s->circular_buffer_error; + pthread_mutex_unlock(&s->mutex); + return err; + } else if(nonblock) { + pthread_mutex_unlock(&s->mutex); + return AVERROR(EAGAIN); + } else { + /* FIXME: using the monotonic clock would be better, + but it does not exist on all supported platforms. */ + int64_t t = av_gettime() + 100000; + struct timespec tv = { .tv_sec = t / 1000000, + .tv_nsec = (t % 1000000) * 1000 }; + int err = pthread_cond_timedwait(&s->cond, &s->mutex, &tv); + if (err) { + pthread_mutex_unlock(&s->mutex); + return AVERROR(err == ETIMEDOUT ? EAGAIN : err); + } + nonblock = 1; + } + } while(1); + } +#endif + #if FF_LIBRIST_VERSION < FF_LIBRIST_VERSION_41 - const struct rist_data_block *data_block; ret = rist_receiver_data_read(s->ctx, &data_block, POLLING_TIME); #else - struct rist_data_block *data_block; ret = rist_receiver_data_read2(s->ctx, &data_block, POLLING_TIME); #endif - if (ret < 0) return risterr2ret(ret); -- 2.30.2 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".