On Sun, 04. Aug 14:36, Andriy Gelman wrote: > Changes in v2: > 1. Replaced zmq_poll with zmq_msg_recv. > 2. Remove user timeout option as zmq_msg_recv(.., .., ZMQ_DONTWAIT) is a > non-blocking call. > 3. Updated docs. > > Andriy > > From 53e6e00d30c9fbf5127eea9d377686d37e981c0c Mon Sep 17 00:00:00 2001 > From: Andriy Gelman <andriy.gel...@gmail.com> > Date: Tue, 30 Jul 2019 14:39:32 -0400 > Subject: [PATCH] libavformat: Add ZeroMQ as a protocol option > > Currently multiple clients are only supported by using a > multicast destination address. An alternative is to stream to a server > which re-distributes the content. > > This commit adds ZeroMQ as a protocol option, which allows > multiple clients to connect to a single ffmpeg instance. > --- > configure | 2 + > doc/general.texi | 1 + > doc/protocols.texi | 32 ++++++++ > libavformat/Makefile | 1 + > libavformat/libzmq.c | 159 ++++++++++++++++++++++++++++++++++++++++ > libavformat/protocols.c | 1 + > 6 files changed, 196 insertions(+) > create mode 100644 libavformat/libzmq.c > > diff --git a/configure b/configure > index 5a4f507246..4515341b06 100755 > --- a/configure > +++ b/configure > @@ -3400,6 +3400,8 @@ libsrt_protocol_deps="libsrt" > libsrt_protocol_select="network" > libssh_protocol_deps="libssh" > libtls_conflict="openssl gnutls mbedtls" > +libzmq_protocol_deps="libzmq" > +libzmq_protocol_select="network" > > # filters > afftdn_filter_deps="avcodec" > diff --git a/doc/general.texi b/doc/general.texi > index 3c0c803449..b8e063268c 100644 > --- a/doc/general.texi > +++ b/doc/general.texi > @@ -1329,6 +1329,7 @@ performance on systems without hardware floating point > support). > @item TCP @tab X > @item TLS @tab X > @item UDP @tab X > +@item ZMQ @tab E > @end multitable > > @code{X} means that the protocol is supported. > diff --git a/doc/protocols.texi b/doc/protocols.texi > index 3e4e7af3d4..174eaacd0f 100644 > --- a/doc/protocols.texi > +++ b/doc/protocols.texi > @@ -1728,4 +1728,36 @@ Timeout in ms. > Create the Unix socket in listening mode. > @end table > > +@section libzmq > + > +ZeroMQ asynchronous messaging library. > + > +This library supports unicast streaming to multiple clients without relying > on > +an external server. > + > +The required syntax for streaming or connecting to a stream is: > +@example > +zmq:tcp://ip-address:port > +@end example > + > +Example: > +Create a localhost stream on port 5555: > +@example > +ffmpeg -re -i input -f mpegts zmq:tcp://127.0.0.1:5555 > +@end example > + > +Multiple clients may connect to the stream using: > +@example > +ffplay zmq:tcp://127.0.0.1:5555 > +@end example > + > +Streaming to multiple clients is implemented using a ZMQ Pub-Sub pattern. > +The server binds to a port and publishes data. Clients connect to the > +server (IP address/port) and subscribe to the stream. The order in which > +the server and client start generally does not matter. > + > +ffmpeg must be compiled with the --enable-libzmq option to support > +this protocol option. See the compilation guide > @url{https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu} > +for an example on how this option may be set. > + > @c man end PROTOCOLS > diff --git a/libavformat/Makefile b/libavformat/Makefile > index a434b005a4..4a535429b7 100644 > --- a/libavformat/Makefile > +++ b/libavformat/Makefile > @@ -631,6 +631,7 @@ OBJS-$(CONFIG_LIBRTMPTE_PROTOCOL) += librtmp.o > OBJS-$(CONFIG_LIBSMBCLIENT_PROTOCOL) += libsmbclient.o > OBJS-$(CONFIG_LIBSRT_PROTOCOL) += libsrt.o > OBJS-$(CONFIG_LIBSSH_PROTOCOL) += libssh.o > +OBJS-$(CONFIG_LIBZMQ_PROTOCOL) += > libzmq.o > > # libavdevice dependencies > OBJS-$(CONFIG_IEC61883_INDEV) += dv.o > diff --git a/libavformat/libzmq.c b/libavformat/libzmq.c > new file mode 100644 > index 0000000000..24eebb1fee > --- /dev/null > +++ b/libavformat/libzmq.c > @@ -0,0 +1,159 @@ > +/* > + * ZMQ URLProtocol > + * > + * This file is part of FFmpeg. > + * > + * FFmpeg is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * FFmpeg is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with FFmpeg; if not, write to the Free Software > + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 > USA > + */ > + > +#include <zmq.h> > +#include "url.h" > +#include "network.h" > +#include "libavutil/avstring.h" > +#include "libavutil/opt.h" > + > +typedef struct ZMQContext { > + const AVClass *class; > + void *context; > + void *socket; > + unsigned int timeout; /*blocking timeout during zmq poll in milliseconds > */ > +} ZMQContext; > + > +static const AVOption options[] = { > + { NULL } > +}; > + > +static int ff_zmq_open(URLContext *h, const char *uri, int flags) > +{ > + int ret; > + ZMQContext *s = h->priv_data; > + s->context = zmq_ctx_new(); > + h->is_streamed = 1; > + > + av_strstart(uri, "zmq:", &uri); > + > + /*publish during write*/ > + if (h->flags & AVIO_FLAG_WRITE) { > + s->socket = zmq_socket(s->context, ZMQ_PUB); > + if (!s->socket) { > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): > %s\n", zmq_strerror(errno)); > + zmq_ctx_destroy(s->context); > + return AVERROR_EXTERNAL; > + } > + > + ret = zmq_bind(s->socket, uri); > + if (ret < 0) { > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", > zmq_strerror(errno)); > + zmq_close(s->socket); > + zmq_ctx_destroy(s->context); > + return AVERROR_EXTERNAL; > + } > + } > + > + /*subscribe for read*/ > + if (h->flags & AVIO_FLAG_READ) { > + s->socket = zmq_socket(s->context, ZMQ_SUB); > + if (!s->socket) { > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): > %s\n", zmq_strerror(errno)); > + zmq_ctx_destroy(s->context); > + return AVERROR_EXTERNAL; > + } > + > + zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0); > + ret = zmq_connect(s->socket, uri); > + if (ret == -1) { > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): > %s\n", zmq_strerror(errno)); > + zmq_close(s->socket); > + zmq_ctx_destroy(s->context); > + return AVERROR_EXTERNAL; > + } > + } > + return 0; > +} > + > +static int ff_zmq_write(URLContext *h, const unsigned char *buf, int size) > +{ > + int ret; > + ZMQContext *s = h->priv_data; > + > + ret = zmq_send(s->socket, buf, size, ZMQ_DONTWAIT); > + if (ret >= 0) > + return ret; /*number of sent bytes*/ > + > + /*errno = EAGAIN if messages cannot be pushed*/ > + if (ret == -1 && errno == EAGAIN) { > + return AVERROR(EAGAIN); > + } else > + return AVERROR_EXTERNAL; > +} > + > +static int ff_zmq_read(URLContext *h, unsigned char *buf, int size) > +{ > + int ret; > + ZMQContext *s = h->priv_data; > + zmq_msg_t msg; > + int msg_size; > + > + ret = zmq_msg_init(&msg); > + if (ret == -1) { > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_init(): %s\n", > zmq_strerror(errno)); > + return AVERROR_EXTERNAL; > + } > + > + ret = zmq_msg_recv(&msg, s->socket, ZMQ_DONTWAIT); > + if (ret == -1) { > + ret = (errno == EAGAIN) ? AVERROR(EAGAIN) : AVERROR_EXTERNAL; > + if (ret == AVERROR_EXTERNAL) > + av_log(h, AV_LOG_ERROR, "Error occured during zmq_msg_recv(): > %s\n", zmq_strerror(errno)); > + goto finish; > + } > + > + msg_size = zmq_msg_size(&msg); > + if (msg_size > size) { > + msg_size = size; > + av_log(h, AV_LOG_WARNING, "ZMQ message exceeds available space in > the buffer. Message will be truncated\n"); > + } > + memcpy(buf, zmq_msg_data(&msg), msg_size); > + > +finish: > + zmq_msg_close(&msg); > + return ret; > +} > + > +static int ff_zmq_close(URLContext *h) > +{ > + ZMQContext *s = h->priv_data; > + zmq_close(s->socket); > + zmq_ctx_destroy(s->context); > + return 0; > +} > + > +static const AVClass zmq_context_class = { > + .class_name = "zmq", > + .item_name = av_default_item_name, > + .option = options, > + .version = LIBAVUTIL_VERSION_INT, > +}; > + > +const URLProtocol ff_libzmq_protocol = { > + .name = "zmq", > + .url_close = ff_zmq_close, > + .url_open = ff_zmq_open, > + .url_read = ff_zmq_read, > + .url_write = ff_zmq_write, > + .priv_data_size = sizeof(ZMQContext), > + .priv_data_class = &zmq_context_class, > + .flags = URL_PROTOCOL_FLAG_NETWORK, > +}; > diff --git a/libavformat/protocols.c b/libavformat/protocols.c > index ad95659795..face5b29b5 100644 > --- a/libavformat/protocols.c > +++ b/libavformat/protocols.c > @@ -68,6 +68,7 @@ extern const URLProtocol ff_librtmpte_protocol; > extern const URLProtocol ff_libsrt_protocol; > extern const URLProtocol ff_libssh_protocol; > extern const URLProtocol ff_libsmbclient_protocol; > +extern const URLProtocol ff_libzmq_protocol; > > #include "libavformat/protocol_list.c" > > -- > 2.22.0
ping _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".