On Sat, 29 Feb 2020, Paul B Mahol wrote:

I think this was already rejected?

jb questioned if this belongs to libavformat, and timo asked how well the message brokers handle high bitrates/big message sizes, no hard rejects were made as far as I remember.

Andriy provided numbers for scaling, I have not answered the concerns regarding libavformat integration, because I am not sure I understand the concern. AMQP is a general purpose protocol for message transfer, it even has an official URL scheme, so when we integrate it into libavformat as a *protocol* I don't really see why it would not fit into the framework or what can be gained if it is implemented separately.

If people still have hard feelings against merging this, please speak up, but I honestly don't see a problem with it.

Thanks,
Marton


On 2/28/20, Andriy Gelman <andriy.gel...@gmail.com> wrote:
From: Andriy Gelman <andriy.gel...@gmail.com>

Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.

Signed-off-by: Andriy Gelman <andriy.gel...@gmail.com>
---

Changes in v2:
    - Addressed comments from Marton
    - Updated documentation

Compilation notes:
    - Requires librabbitmq-dev package (on ubuntu).
    - The pkg-config libprabbitmq.pc has a corrupt entry.
      **update: fixed on the github master branch**
      The line "Libs.private: rt; -lpthread" should be changed to
      "Libs.private: -lrt -pthread".
    - Compile FFmpeg with --enable-librabbitmq

To run an example:
#
# Start the RabbitMQ broker (I use docker)
# The following starts the broker on localhost:5672. A webui is available on
# localhost:15672 (User/password is "guest" by default)
#
$ docker run -it --rm --name rabbitmq -p 127.0.0.1:5672:5672 -p
127.0.0.1:15672:15672 rabbitmq:3-management

#
# Stream to the RabbitMQ broker:
#
$ ./ffmpeg -re -f lavfi -i yuvtestsrc -codec:v libx264 -f mpegts
-routing_key "amqp" -exchange "amq.direct" amqp://localhost

#
# Connect any number of clients to fetch data from the broker:
# The clients are filtered by the routing_key and exchange.
#
$ ./ffplay -routing_key "amqp" -exchange "amq.direct" amqp://localhost

 Changelog               |   1 +
 configure               |   5 +
 doc/general.texi        |   1 +
 doc/protocols.texi      |  60 +++++++++
 libavformat/Makefile    |   1 +
 libavformat/libamqp.c   | 286 ++++++++++++++++++++++++++++++++++++++++
 libavformat/protocols.c |   1 +
 libavformat/version.h   |   4 +-
 8 files changed, 357 insertions(+), 2 deletions(-)
 create mode 100644 libavformat/libamqp.c

diff --git a/Changelog b/Changelog
index cb310a3abc2..ab30d670a15 100644
--- a/Changelog
+++ b/Changelog
@@ -43,6 +43,7 @@ version <next>:
 - Rayman 2 ADPCM decoder
 - Rayman 2 APM demuxer
 - cas video filter
+- AMQP 0-9-1 protocol (RabbitMQ)


 version 4.2:
diff --git a/configure b/configure
index 06e3a7b2a88..8b171349440 100755
--- a/configure
+++ b/configure
@@ -255,6 +255,7 @@ External library support:
   --enable-libopenmpt      enable decoding tracked files via libopenmpt
[no]
   --enable-libopus         enable Opus de/encoding via libopus [no]
   --enable-libpulse        enable Pulseaudio input via libpulse [no]
+  --enable-librabbitmq     enable RabbitMQ library [no]
   --enable-librav1e        enable AV1 encoding via rav1e [no]
   --enable-librsvg         enable SVG rasterization via librsvg [no]
   --enable-librubberband   enable rubberband needed for rubberband filter
[no]
@@ -1789,6 +1790,7 @@ EXTERNAL_LIBRARY_LIST="
     libopenmpt
     libopus
     libpulse
+    librabbitmq
     librav1e
     librsvg
     librtmp
@@ -3434,6 +3436,8 @@ unix_protocol_deps="sys_un_h"
 unix_protocol_select="network"

 # external library protocols
+libamqp_protocol_deps="librabbitmq"
+libamqp_protocol_select="network"
 librtmp_protocol_deps="librtmp"
 librtmpe_protocol_deps="librtmp"
 librtmps_protocol_deps="librtmp"
@@ -6317,6 +6321,7 @@ enabled libopus           && {
     }
 }
 enabled libpulse          && require_pkg_config libpulse libpulse
pulse/pulseaudio.h pa_context_new
+enabled librabbitmq       && require_pkg_config librabbitmq "librabbitmq >=
0.7.1" amqp.h amqp_new_connection
 enabled librav1e          && require_pkg_config librav1e "rav1e >= 0.1.0"
rav1e.h rav1e_context_new
 enabled librsvg           && require_pkg_config librsvg librsvg-2.0
librsvg-2.0/librsvg/rsvg.h rsvg_handle_render_cairo
 enabled librtmp           && require_pkg_config librtmp librtmp
librtmp/rtmp.h RTMP_Socket
diff --git a/doc/general.texi b/doc/general.texi
index dbdc3485982..623566dabea 100644
--- a/doc/general.texi
+++ b/doc/general.texi
@@ -1330,6 +1330,7 @@ performance on systems without hardware floating point
support).

 @multitable @columnfractions .4 .1
 @item Name         @tab Support
+@item AMQP         @tab X
 @item file         @tab X
 @item FTP          @tab X
 @item Gopher       @tab X
diff --git a/doc/protocols.texi b/doc/protocols.texi
index 54a287f488b..dc5f49ba8cc 100644
--- a/doc/protocols.texi
+++ b/doc/protocols.texi
@@ -51,6 +51,66 @@ in microseconds.

 A description of the currently available protocols follows.

+@section amqp
+
+Advanced Message Queueing Protocol (AMQP) version 0-9-1 is a broker based
+publish-subscribe communication protocol.
+
+FFmpeg must be compiled with --enable-librabbitmq to support AMQP. A
separate
+AMQP broker must also be run. An example open-source AMQP broker is
RabbitMQ.
+
+After starting the broker, an FFmpeg client may stream data to the broker
using
+the command:
+
+@example
+ffmpeg -re -i input -f mpegts amqp://[[user]:[password]@@]hostname[:port]
+@end example
+
+Where hostname and port (default is 5672) is the address of the broker. The
+client may also set a user/password for authentication. The default for
both
+fields is "guest".
+
+Muliple subscribers may stream from the broker using the command:
+@example
+ffplay amqp://[[user]:[password]@@]hostname[:port]
+@end example
+
+In RabbitMQ all data published to the broker flows through a specific
exchange,
+and each subscribing client has an assigned queue/buffer. When a packet
arrives
+at an exchange, it may be copied to a client's queue depending on the
exchange
+and routing_key fields.
+
+The following options are supported:
+
+@table @option
+
+@item exchange
+Sets the exchange to use on the broker. RabbitMQ has several predefined
+exchanges: "amq.direct" is the default exchange, where the publisher and
+subscriber must have a matching routing_key; "amq.fanout" is the same as a
+broadcast operation (i.e. the data is forwarded to all queues on the fanout
+exchange independent of the routing_key); and "amq.topic" is similar to
+"amq.direct", but allows for more complex pattern matching (refer to the
RabbitMQ
+documentation).
+
+@item routing_key
+Sets the routing key. The default value is "amqp". The routing key is used
on
+the "amq.direct" and "amq.topic" exchanges to decide whether packets are
written
+to the queue of a subscriber.
+
+@item pkt_size
+Maximum size of each packet sent/received to the broker. Default is 131072.
+Minimum is 4096 and max is any large value (representable by an int). When
+receiving packets, this sets an internal buffer size in FFmpeg. It should
be
+equal to or greater than the size of the published packets to the broker.
Otherwise
+the received message may be truncated causing decoding errors.
+
+@item connection_timeout
+The timeout in microseconds during the initial connection to the broker.
The
+default value is rw_timeout, or 5000000 microseconds if rw_timeout is not
set.
+
+@end table
+
 @section async

 Asynchronous data filling wrapper for input stream.
diff --git a/libavformat/Makefile b/libavformat/Makefile
index e0681058a29..52b44289fee 100644
--- a/libavformat/Makefile
+++ b/libavformat/Makefile
@@ -629,6 +629,7 @@ OBJS-$(CONFIG_UDPLITE_PROTOCOL)          += udp.o ip.o
 OBJS-$(CONFIG_UNIX_PROTOCOL)             += unix.o

 # external library protocols
+OBJS-$(CONFIG_LIBAMQP_PROTOCOL)          += libamqp.o
 OBJS-$(CONFIG_LIBRTMP_PROTOCOL)          += librtmp.o
 OBJS-$(CONFIG_LIBRTMPE_PROTOCOL)         += librtmp.o
 OBJS-$(CONFIG_LIBRTMPS_PROTOCOL)         += librtmp.o
diff --git a/libavformat/libamqp.c b/libavformat/libamqp.c
new file mode 100644
index 00000000000..baa42306bb0
--- /dev/null
+++ b/libavformat/libamqp.c
@@ -0,0 +1,286 @@
+/*
+ * Advanced Message Queuing Protocol (AMQP) 0-9-1
+ * Copyright (c) 2020 Andriy Gelman
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
USA
+ */
+
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
+#include <sys/time.h>
+#include "avformat.h"
+#include "libavformat/urldecode.h"
+#include "libavutil/avstring.h"
+#include "libavutil/opt.h"
+#include "libavutil/time.h"
+#include "network.h"
+#include "url.h"
+
+typedef struct AMQPContext {
+    const AVClass *class;
+    amqp_connection_state_t conn;
+    amqp_socket_t *socket;
+    const char *exchange;
+    const char *routing_key;
+    int pkt_size;
+    int64_t connection_timeout;
+    int pkt_size_overflow;
+} AMQPContext;
+
+#define STR_LEN           1024
+#define DEFAULT_CHANNEL   1
+
+#define OFFSET(x) offsetof(AMQPContext, x)
+#define D AV_OPT_FLAG_DECODING_PARAM
+#define E AV_OPT_FLAG_ENCODING_PARAM
+static const AVOption options[] = {
+    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
+    { "exchange", "Exchange to send/read packets", OFFSET(exchange),
AV_OPT_TYPE_STRING, { .str = "amq.direct" }, 0, 0, .flags = D | E },
+    { "routing_key", "Key to filter streams", OFFSET(routing_key),
AV_OPT_TYPE_STRING, { .str = "amqp" }, 0, 0, .flags = D | E },
+    { "connection_timeout", "Initial connection timeout",
OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1,
INT_MAX, .flags = D | E},
+    { NULL }
+};
+
+static int amqp_proto_open(URLContext *h, const char *uri, int flags)
+{
+    int ret, server_msg;
+    char hostname[STR_LEN], credentials[STR_LEN];
+    char *credentials_decoded;
+    int port;
+    const char *user, *password = NULL;
+    char *p;
+    amqp_rpc_reply_t broker_reply;
+    struct timeval tval = { 0 };
+
+    AMQPContext *s = h->priv_data;
+
+    h->is_streamed     = 1;
+    h->max_packet_size = s->pkt_size;
+
+    av_url_split(NULL, 0, credentials, sizeof(credentials),
+                 hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+    if (port < 0)
+        port = 5672;
+
+    if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
+        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
+        return AVERROR(EINVAL);
+    }
+
+    credentials_decoded = ff_urldecode(credentials, 0);
+    if (!credentials_decoded)
+        return AVERROR(ENOMEM);
+
+    p = strchr(credentials_decoded, ':');
+    if (p) {
+        *p = '\0';
+        password = p + 1;
+    }
+
+    if (!password || *password == '\0')
+        password = "guest";
+
+    user = credentials_decoded;
+    if (*user == '\0')
+        user = "guest";
+
+    s->conn = amqp_new_connection();
+    if (!s->conn) {
+        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    s->socket = amqp_tcp_socket_new(s->conn);
+    if (!s->socket) {
+        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
+        goto destroy_connection;
+    }
+
+    if (s->connection_timeout < 0)
+        s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout :
5000000);
+
+    tval.tv_sec  = s->connection_timeout / 1000000;
+    tval.tv_usec = s->connection_timeout % 1000000;
+    ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
+
+    if (ret) {
+        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
+        goto destroy_connection;
+    }
+
+    broker_reply = amqp_login(s->conn, "/", 0, s->pkt_size, 0,
+                              AMQP_SASL_METHOD_PLAIN, user, password);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+        av_log(h, AV_LOG_ERROR, "Error login\n");
+        server_msg = AMQP_ACCESS_REFUSED;
+        goto close_connection;
+    }
+
+    amqp_channel_open(s->conn, DEFAULT_CHANNEL);
+    broker_reply = amqp_get_rpc_reply(s->conn);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+        av_log(h, AV_LOG_ERROR, "Error set channel\n");
+        server_msg = AMQP_CHANNEL_ERROR;
+        goto close_connection;
+    }
+
+    if (h->flags & AVIO_FLAG_READ) {
+        amqp_bytes_t queuename;
+        char queuename_buff[STR_LEN];
+        amqp_queue_declare_ok_t *r;
+
+        r = amqp_queue_declare(s->conn, DEFAULT_CHANNEL, amqp_empty_bytes,
+                               0, 0, 0, 1, amqp_empty_table);
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (!r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Error declare queue\n");
+            server_msg = AMQP_RESOURCE_ERROR;
+            goto close_channel;
+        }
+
+        /* store queuename */
+        queuename.bytes = queuename_buff;
+        queuename.len = FFMIN(r->queue.len, STR_LEN);
+        memcpy(queuename.bytes, r->queue.bytes, queuename.len);
+
+        amqp_queue_bind(s->conn, DEFAULT_CHANNEL, queuename,
amqp_cstring_bytes(s->exchange),
+                        amqp_cstring_bytes(s->routing_key),
amqp_empty_table);
+
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Queue bind error\n");
+            server_msg = AMQP_INTERNAL_ERROR;
+            goto close_channel;
+        }
+
+        amqp_basic_consume(s->conn, DEFAULT_CHANNEL, queuename,
amqp_empty_bytes,
+                           0, 1, 0, amqp_empty_table);
+
+        broker_reply = amqp_get_rpc_reply(s->conn);
+        if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
+            av_log(h, AV_LOG_ERROR, "Set consume error\n");
+            server_msg = AMQP_INTERNAL_ERROR;
+            goto close_channel;
+        }
+    }
+
+    av_freep(&credentials_decoded);
+    return 0;
+
+close_channel:
+    amqp_channel_close(s->conn, DEFAULT_CHANNEL, server_msg);
+close_connection:
+    amqp_connection_close(s->conn, server_msg);
+destroy_connection:
+    amqp_destroy_connection(s->conn);
+
+    av_freep(&credentials_decoded);
+    return AVERROR_EXTERNAL;
+}
+
+static int amqp_proto_write(URLContext *h, const unsigned char *buf, int
size)
+{
+    int ret;
+    AMQPContext *s = h->priv_data;
+    int fd = amqp_socket_get_sockfd(s->socket);
+
+    amqp_bytes_t message = { size, (void *)buf };
+    amqp_basic_properties_t props;
+
+    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout,
&h->interrupt_callback);
+    if (ret)
+        return ret;
+
+    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG;
+    props.content_type = amqp_cstring_bytes("octet/stream");
+    props.delivery_mode = 2; /* persistent delivery mode */
+
+    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL,
amqp_cstring_bytes(s->exchange),
+                             amqp_cstring_bytes(s->routing_key), 0, 0,
+                             &props, message);
+
+    if (ret) {
+        av_log(h, AV_LOG_ERROR, "Error publish\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    return size;
+}
+
+static int amqp_proto_read(URLContext *h, unsigned char *buf, int size)
+{
+    AMQPContext *s = h->priv_data;
+    int fd = amqp_socket_get_sockfd(s->socket);
+    int ret;
+
+    amqp_rpc_reply_t broker_reply;
+    amqp_envelope_t envelope;
+
+    ret = ff_network_wait_fd_timeout(fd, 0, h->rw_timeout,
&h->interrupt_callback);
+    if (ret)
+        return ret;
+
+    amqp_maybe_release_buffers(s->conn);
+    broker_reply = amqp_consume_message(s->conn, &envelope, NULL, 0);
+
+    if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
+        return AVERROR_EXTERNAL;
+
+    if (envelope.message.body.len > size) {
+        s->pkt_size_overflow = FFMAX(s->pkt_size_overflow,
envelope.message.body.len);
+        av_log(h, AV_LOG_WARNING, "Message exceeds space in the buffer. "
+                                  "Message will be truncated. Setting
-pkt_size %d "
+                                  "may resolve this issue.\n",
s->pkt_size_overflow);
+    }
+    size = FFMIN(size, envelope.message.body.len);
+
+    memcpy(buf, envelope.message.body.bytes, size);
+    amqp_destroy_envelope(&envelope);
+
+    return size;
+}
+
+static int amqp_proto_close(URLContext *h)
+{
+    AMQPContext *s = h->priv_data;
+    amqp_channel_close(s->conn, DEFAULT_CHANNEL, AMQP_REPLY_SUCCESS);
+    amqp_connection_close(s->conn, AMQP_REPLY_SUCCESS);
+    amqp_destroy_connection(s->conn);
+
+    return 0;
+}
+
+static const AVClass amqp_context_class = {
+    .class_name = "amqp",
+    .item_name  = av_default_item_name,
+    .option     = options,
+    .version    = LIBAVUTIL_VERSION_INT,
+};
+
+const URLProtocol ff_libamqp_protocol = {
+    .name            = "amqp",
+    .url_close       = amqp_proto_close,
+    .url_open        = amqp_proto_open,
+    .url_read        = amqp_proto_read,
+    .url_write       = amqp_proto_write,
+    .priv_data_size  = sizeof(AMQPContext),
+    .priv_data_class = &amqp_context_class,
+    .flags           = URL_PROTOCOL_FLAG_NETWORK,
+};
diff --git a/libavformat/protocols.c b/libavformat/protocols.c
index 29fb99e7fa3..f1b8eab0fd6 100644
--- a/libavformat/protocols.c
+++ b/libavformat/protocols.c
@@ -60,6 +60,7 @@ extern const URLProtocol ff_tls_protocol;
 extern const URLProtocol ff_udp_protocol;
 extern const URLProtocol ff_udplite_protocol;
 extern const URLProtocol ff_unix_protocol;
+extern const URLProtocol ff_libamqp_protocol;
 extern const URLProtocol ff_librtmp_protocol;
 extern const URLProtocol ff_librtmpe_protocol;
 extern const URLProtocol ff_librtmps_protocol;
diff --git a/libavformat/version.h b/libavformat/version.h
index 4724269b3c4..a233b673512 100644
--- a/libavformat/version.h
+++ b/libavformat/version.h
@@ -32,8 +32,8 @@
 // Major bumping may affect Ticket5467, 5421, 5451(compatibility with
Chromium)
 // Also please add any ticket numbers that you believe might be affected
here
 #define LIBAVFORMAT_VERSION_MAJOR  58
-#define LIBAVFORMAT_VERSION_MINOR  39
-#define LIBAVFORMAT_VERSION_MICRO 101
+#define LIBAVFORMAT_VERSION_MINOR  40
+#define LIBAVFORMAT_VERSION_MICRO 100

 #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
                                                LIBAVFORMAT_VERSION_MINOR, \
--
2.25.0

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".

Reply via email to