This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new aabfc8eb78 [Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365) aabfc8eb78 is described below commit aabfc8eb78dd33ae479e9ea82bac8371cf78092d Author: Logic <zqr10...@126.com> AuthorDate: Thu Aug 29 21:05:25 2024 +0800 [Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365) --- docs/en/connector-v2/sink/Rabbitmq.md | 39 ++++++++++++++++++++++ .../seatunnel/rabbitmq/client/RabbitmqClient.java | 11 ++++-- .../seatunnel/rabbitmq/config/RabbitmqConfig.java | 36 ++++++++++++++++++++ .../e2e/connector/rabbitmq/RabbitmqIT.java | 9 +++++ .../src/test/resources/rabbitmq-to-rabbitmq.conf | 6 ++++ 5 files changed, 98 insertions(+), 3 deletions(-) diff --git a/docs/en/connector-v2/sink/Rabbitmq.md b/docs/en/connector-v2/sink/Rabbitmq.md index 489287249e..c7963525fb 100644 --- a/docs/en/connector-v2/sink/Rabbitmq.md +++ b/docs/en/connector-v2/sink/Rabbitmq.md @@ -57,6 +57,21 @@ convenience method for setting the fields in an AMQP URI: host, port, username, the queue to write the message to +### durable [boolean] + +true: The queue will survive a server restart. +false: The queue will be deleted on server restart. + +### exclusive [boolean] + +true: The queue is used only by the current connection and will be deleted when the connection closes. +false: The queue can be used by multiple connections. + +### auto_delete [boolean] + +true: The queue will be deleted automatically when the last consumer unsubscribes. +false: The queue will not be automatically deleted. + ### schema [Config] #### fields [Config] @@ -112,6 +127,30 @@ sink { } ``` +### Example 2 + +queue with durable, exclusive, auto_delete: + +```hocon +sink { + RabbitMQ { + host = "rabbitmq-e2e" + port = 5672 + virtual_host = "/" + username = "guest" + password = "guest" + queue_name = "test1" + durable = "true" + exclusive = "false" + auto_delete = "false" + rabbitmq.config = { + requested-heartbeat = 10 + connection-timeout = 10 + } + } +} +``` + ## Changelog ### next version diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java index 82ae2728d6..3f5c862cad 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java @@ -189,11 +189,16 @@ public class RabbitmqClient { protected void setupQueue() throws IOException { if (config.getQueueName() != null) { - declareQueueDefaults(channel, config.getQueueName()); + declareQueueDefaults(channel, config); } } - private void declareQueueDefaults(Channel channel, String queueName) throws IOException { - channel.queueDeclare(queueName, true, false, false, null); + private void declareQueueDefaults(Channel channel, RabbitmqConfig config) throws IOException { + channel.queueDeclare( + config.getQueueName(), + config.getDurable(), + config.getExclusive(), + config.getAutoDelete(), + null); } } diff --git a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java index e8e2ce55c3..8475817457 100644 --- a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java +++ b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java @@ -53,6 +53,9 @@ public class RabbitmqConfig implements Serializable { private Integer prefetchCount; private long deliveryTimeout; private String queueName; + private Boolean durable; + private Boolean exclusive; + private Boolean autoDelete; private String routingKey; private boolean logFailuresOnly = false; private String exchange = ""; @@ -195,6 +198,30 @@ public class RabbitmqConfig implements Serializable { "Whether the messages received are supplied with a unique" + "id to deduplicate messages (in case of failed acknowledgments)."); + public static final Option<Boolean> DURABLE = + Options.key("durable") + .booleanType() + .defaultValue(true) + .withDescription( + "true: The queue will survive a server restart." + + "false: The queue will be deleted on server restart."); + + public static final Option<Boolean> EXCLUSIVE = + Options.key("exclusive") + .booleanType() + .defaultValue(false) + .withDescription( + "true: The queue is used only by the current connection and will be deleted when the connection closes." + + "false: The queue can be used by multiple connections."); + + public static final Option<Boolean> AUTO_DELETE = + Options.key("auto_delete") + .booleanType() + .defaultValue(false) + .withDescription( + "true: The queue will be deleted automatically when the last consumer unsubscribes." + + "false: The queue will not be automatically deleted."); + private void parseSinkOptionProperties(Config pluginConfig) { if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) { pluginConfig @@ -259,6 +286,15 @@ public class RabbitmqConfig implements Serializable { if (config.hasPath(USE_CORRELATION_ID.key())) { this.usesCorrelationId = config.getBoolean(USE_CORRELATION_ID.key()); } + if (config.hasPath(DURABLE.key())) { + this.durable = config.getBoolean(DURABLE.key()); + } + if (config.hasPath(EXCLUSIVE.key())) { + this.exclusive = config.getBoolean(EXCLUSIVE.key()); + } + if (config.hasPath(AUTO_DELETE.key())) { + this.autoDelete = config.getBoolean(AUTO_DELETE.key()); + } parseSinkOptionProperties(config); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java index 7052aa9bef..a846949d85 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java @@ -75,6 +75,9 @@ public class RabbitmqIT extends TestSuiteBase implements TestResource { private static final String SINK_QUEUE_NAME = "test1"; private static final String USERNAME = "guest"; private static final String PASSWORD = "guest"; + private static final Boolean DURABLE = true; + private static final Boolean EXCLUSIVE = false; + private static final Boolean AUTO_DELETE = false; private static final Pair<SeaTunnelRowType, List<SeaTunnelRow>> TEST_DATASET = generateTestDataSet(); @@ -185,6 +188,9 @@ public class RabbitmqIT extends TestSuiteBase implements TestResource { config.setVirtualHost("/"); config.setUsername(USERNAME); config.setPassword(PASSWORD); + config.setDurable(DURABLE); + config.setExclusive(EXCLUSIVE); + config.setAutoDelete(AUTO_DELETE); rabbitmqClient = new RabbitmqClient(config); } catch (Exception e) { throw new RuntimeException("init Rabbitmq error", e); @@ -201,6 +207,9 @@ public class RabbitmqIT extends TestSuiteBase implements TestResource { config.setVirtualHost("/"); config.setUsername(USERNAME); config.setPassword(PASSWORD); + config.setDurable(DURABLE); + config.setExclusive(EXCLUSIVE); + config.setAutoDelete(AUTO_DELETE); return new RabbitmqClient(config); } catch (Exception e) { throw new RuntimeException("init Rabbitmq error", e); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf index b3a834bdc2..61267a3adc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf @@ -28,6 +28,9 @@ source { username = "guest" password = "guest" queue_name = "test" + durable = "true" + exclusive = "false" + auto_delete = "false" for_e2e_testing = true schema = { fields { @@ -61,6 +64,9 @@ sink { virtual_host = "/" username = "guest" password = "guest" + durable = "true" + exclusive = "false" + auto_delete = "false" queue_name = "test1" } } \ No newline at end of file