Subject: [FFmpeg-devel] [PATCH v2 1/2] avformat: Add AMQP version 0-9-1
protocol support
From: Andriy Gelman <andriy.gel...@gmail.com>
Supports connecting to a RabbitMQ broker via AMQP version 0-9-1.
Signed-off-by: Andriy Gelman <andriy.gel...@gmail.com>
---
Changes in v2:
- Addressed comments from Marton
- Updated documentation
Compilation notes:
- Requires librabbitmq-dev package (on ubuntu).
- The pkg-config libprabbitmq.pc has a corrupt entry.
**update: fixed on the github master branch**
The line "Libs.private: rt; -lpthread" should be changed to
"Libs.private: -lrt -pthread".
- Compile FFmpeg with --enable-librabbitmq
[...]
+@item connection_timeout
+The timeout in microseconds during the initial connection to the broker. The
In *seconds* (because it is an AV_OPT_TYPE_DURATION)
+default value is rw_timeout, or 5000000 microseconds if rw_timeout is not set.
5 seconds
[...]
+static const AVOption options[] = {
+ { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags = D | E },
+ { "exchange", "Exchange to send/read packets", OFFSET(exchange), AV_OPT_TYPE_STRING,
{ .str = "amq.direct" }, 0, 0, .flags = D | E },
+ { "routing_key", "Key to filter streams", OFFSET(routing_key), AV_OPT_TYPE_STRING, {
.str = "amqp" }, 0, 0, .flags = D | E },
+ { "connection_timeout", "Initial connection timeout",
OFFSET(connection_timeout), AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT_MAX, .flags = D | E},
INT64_MAX can be the maximum.
+ { NULL }
+};
+
+static int amqp_proto_open(URLContext *h, const char *uri, int flags)
+{
+ int ret, server_msg;
+ char hostname[STR_LEN], credentials[STR_LEN];
+ char *credentials_decoded;
+ int port;
+ const char *user, *password = NULL;
+ char *p;
+ amqp_rpc_reply_t broker_reply;
+ struct timeval tval = { 0 };
+
+ AMQPContext *s = h->priv_data;
+
+ h->is_streamed = 1;
+ h->max_packet_size = s->pkt_size;
+
+ av_url_split(NULL, 0, credentials, sizeof(credentials),
+ hostname, sizeof(hostname), &port, NULL, 0, uri);
+
+ if (port < 0)
+ port = 5672;
+
+ if (hostname[0] == '\0' || port <= 0 || port > 65535 ) {
+ av_log(h, AV_LOG_ERROR, "Invalid hostname/port\n");
+ return AVERROR(EINVAL);
+ }
+
+ credentials_decoded = ff_urldecode(credentials, 0);
This is not entirely correct, becase the username may contain ':'
characters... So you should split first and urldecode the splitted
components...
+ if (!credentials_decoded)
+ return AVERROR(ENOMEM);
+
+ p = strchr(credentials_decoded, ':');
+ if (p) {
+ *p = '\0';
+ password = p + 1;
+ }
+
+ if (!password || *password == '\0')
+ password = "guest";
+
+ user = credentials_decoded;
+ if (*user == '\0')
+ user = "guest";
+
+ s->conn = amqp_new_connection();
+ if (!s->conn) {
+ av_log(h, AV_LOG_ERROR, "Error creating connection\n");
+ return AVERROR_EXTERNAL;
+ }
+
+ s->socket = amqp_tcp_socket_new(s->conn);
+ if (!s->socket) {
+ av_log(h, AV_LOG_ERROR, "Error creating socket\n");
+ goto destroy_connection;
+ }
+
+ if (s->connection_timeout < 0)
+ s->connection_timeout = (h->rw_timeout > 0 ? h->rw_timeout : 5000000);
+
+ tval.tv_sec = s->connection_timeout / 1000000;
+ tval.tv_usec = s->connection_timeout % 1000000;
+ ret = amqp_socket_open_noblock(s->socket, hostname, port, &tval);
+
+ if (ret) {
+ av_log(h, AV_LOG_ERROR, "Error connecting to server\n");
This should log the useful error, e.g:
av_log(h, AV_LOG_ERROR, "Error connecting to server: %s\n",
amqp_error_string2(ret));
[...]
+static int amqp_proto_write(URLContext *h, const unsigned char *buf, int size)
+{
+ int ret;
+ AMQPContext *s = h->priv_data;
+ int fd = amqp_socket_get_sockfd(s->socket);
+
+ amqp_bytes_t message = { size, (void *)buf };
+ amqp_basic_properties_t props;
+
+ ret = ff_network_wait_fd_timeout(fd, 1, h->rw_timeout,
&h->interrupt_callback);
+ if (ret)
+ return ret;
+
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG;
+ props.content_type = amqp_cstring_bytes("octet/stream");
+ props.delivery_mode = 2; /* persistent delivery mode */
+
+ ret = amqp_basic_publish(s->conn, DEFAULT_CHANNEL,
amqp_cstring_bytes(s->exchange),
+ amqp_cstring_bytes(s->routing_key), 0, 0,
+ &props, message);
+
+ if (ret) {
+ av_log(h, AV_LOG_ERROR, "Error publish\n");
Same here
+ return AVERROR_EXTERNAL;
+ }
+
+ return size;
+}
+
[...]
Thanks,
Marton
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".