Subject: [FFmpeg-devel] [PATCH v2 1/2] avformat: Add AMQP version 0-9-1 
protocol support

From: Andriy Gelman <andriy.gel...@gmail.com>

Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.

Signed-off-by: Andriy Gelman <andriy.gel...@gmail.com>
---

Changes in v2:
    - Addressed comments from Marton
    - Updated documentation

Compilation notes:
    - Requires librabbitmq-dev package (on ubuntu).
    - The pkg-config libprabbitmq.pc has a corrupt entry.
      **update: fixed on the github master branch**
      The line "Libs.private: rt; -lpthread" should be changed to
      "Libs.private: -lrt -pthread".
    - Compile FFmpeg with --enable-librabbitmq


[...]

+@item connection_timeout
+The timeout in microseconds during the initial connection to the broker. The

In *seconds* (because it is an AV_OPT_TYPE_DURATION)

+default value is rw_timeout, or 5000000 microseconds if rw_timeout is not set.

5 seconds

[...]

+static const AVOption options[] = {
+    { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), 
AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
+    { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING, 
{ .str = "amq.direct" }, 0, 0, .flags = D | E },
+    { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, { 
.str = "amqp" }, 0, 0, .flags = D | E },
+    { "connection_timeout", "Initial connection timeout", 
OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},

INT64_MAX can be the maximum.

+    { NULL }
+};
+
+static int amqp_proto_open(URLContext *h, const char *uri, int flags)
+{
+    int ret, server_msg;
+    char hostname[STR_LEN], credentials[STR_LEN];
+    char *credentials_decoded;
+    int port;
+    const char *user, *password = NULL;
+    char *p;
+    amqp_rpc_reply_t broker_reply;
+    struct timeval tval = { 0 };
+
+    AMQPContext *s = h->priv_data;
+
+    h->is_streamed     = 1;
+    h->max_packet_size = s->pkt_size;
+
+    av_url_split(NULL, 0, credentials, sizeof(credentials),
+                 hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+    if (port < 0)
+        port = 5672;
+
+    if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
+        av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
+        return AVERROR(EINVAL);
+    }
+
+    credentials_decoded = ff_urldecode(credentials, 0);

This is not entirely correct, becase the username may contain ':'
characters... So you should split first and urldecode the splitted
components...

+    if (!credentials_decoded)
+        return AVERROR(ENOMEM);
+
+    p = strchr(credentials_decoded, ':');
+    if (p) {
+        *p = '\0';
+        password = p + 1;
+    }
+
+    if (!password || *password == '\0')
+        password = "guest";
+
+    user = credentials_decoded;
+    if (*user == '\0')
+        user = "guest";
+
+    s->conn = amqp_new_connection();
+    if (!s->conn) {
+        av_log(h, AV_LOG_ERROR, "Error creating connection\n");
+        return AVERROR_EXTERNAL;
+    }
+
+    s->socket = amqp_tcp_socket_new(s->conn);
+    if (!s->socket) {
+        av_log(h, AV_LOG_ERROR, "Error creating socket\n");
+        goto destroy_connection;
+    }
+
+    if (s->connection_timeout < 0)
+        s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
+
+    tval.tv_sec  = s->connection_timeout / 1000000;
+    tval.tv_usec = s->connection_timeout % 1000000;
+    ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
+
+    if (ret) {
+        av_log(h, AV_LOG_ERROR, "Error connecting to server\n");

This should log the useful error, e.g:
        av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n", 
amqp_error_string2(ret));

[...]

+static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
+{
+    int ret;
+    AMQPContext *s = h->priv_data;
+    int fd = amqp_socket_get_sockfd(s->socket);
+
+    amqp_bytes_t message = { size, (void *)buf };
+    amqp_basic_properties_t props;
+
+    ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout, 
&h->interrupt_callback);
+    if (ret)
+        return ret;
+
+    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | 
AMQP_BASIC_DELIVERY_MODE_FLAG;
+    props.content_type = amqp_cstring_bytes("octet/stream");
+    props.delivery_mode = 2; /* persistent delivery mode */
+
+    ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL, 
amqp_cstring_bytes(s->exchange),
+                             amqp_cstring_bytes(s->routing_key), 0, 0,
+                             &props, message);
+
+    if (ret) {
+        av_log(h, AV_LOG_ERROR, "Error publish\n");

Same here

+        return AVERROR_EXTERNAL;
+    }
+
+    return size;
+}
+

[...]

Thanks,
Marton

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".

Reply via email to