This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new aabfc8eb78 [Feature][Rabbitmq] Allow configuration of queue durability 
and deletion policy (#7365)
aabfc8eb78 is described below

commit aabfc8eb78dd33ae479e9ea82bac8371cf78092d
Author: Logic <zqr10...@126.com>
AuthorDate: Thu Aug 29 21:05:25 2024 +0800

    [Feature][Rabbitmq] Allow configuration of queue durability and deletion 
policy (#7365)
---
 docs/en/connector-v2/sink/Rabbitmq.md              | 39 ++++++++++++++++++++++
 .../seatunnel/rabbitmq/client/RabbitmqClient.java  | 11 ++++--
 .../seatunnel/rabbitmq/config/RabbitmqConfig.java  | 36 ++++++++++++++++++++
 .../e2e/connector/rabbitmq/RabbitmqIT.java         |  9 +++++
 .../src/test/resources/rabbitmq-to-rabbitmq.conf   |  6 ++++
 5 files changed, 98 insertions(+), 3 deletions(-)

diff --git a/docs/en/connector-v2/sink/Rabbitmq.md 
b/docs/en/connector-v2/sink/Rabbitmq.md
index 489287249e..c7963525fb 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -57,6 +57,21 @@ convenience method for setting the fields in an AMQP URI: 
host, port, username,
 
 the queue to write the message to
 
+### durable [boolean]
+
+true: The queue will survive a server restart.
+false: The queue will be deleted on server restart.
+
+### exclusive [boolean]
+
+true: The queue is used only by the current connection and will be deleted 
when the connection closes.
+false: The queue can be used by multiple connections.
+
+### auto_delete [boolean]
+
+true: The queue will be deleted automatically when the last consumer 
unsubscribes.
+false: The queue will not be automatically deleted.
+
 ### schema [Config]
 
 #### fields [Config]
@@ -112,6 +127,30 @@ sink {
 }
 ```
 
+### Example 2
+
+queue with durable, exclusive, auto_delete:
+
+```hocon
+sink {
+      RabbitMQ {
+          host = "rabbitmq-e2e"
+          port = 5672
+          virtual_host = "/"
+          username = "guest"
+          password = "guest"
+          queue_name = "test1"
+          durable = "true"
+          exclusive = "false"
+          auto_delete = "false"
+          rabbitmq.config = {
+            requested-heartbeat = 10
+            connection-timeout = 10
+          }
+      }
+}
+```
+
 ## Changelog
 
 ### next version
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
index 82ae2728d6..3f5c862cad 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java
@@ -189,11 +189,16 @@ public class RabbitmqClient {
 
     protected void setupQueue() throws IOException {
         if (config.getQueueName() != null) {
-            declareQueueDefaults(channel, config.getQueueName());
+            declareQueueDefaults(channel, config);
         }
     }
 
-    private void declareQueueDefaults(Channel channel, String queueName) 
throws IOException {
-        channel.queueDeclare(queueName, true, false, false, null);
+    private void declareQueueDefaults(Channel channel, RabbitmqConfig config) 
throws IOException {
+        channel.queueDeclare(
+                config.getQueueName(),
+                config.getDurable(),
+                config.getExclusive(),
+                config.getAutoDelete(),
+                null);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index e8e2ce55c3..8475817457 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -53,6 +53,9 @@ public class RabbitmqConfig implements Serializable {
     private Integer prefetchCount;
     private long deliveryTimeout;
     private String queueName;
+    private Boolean durable;
+    private Boolean exclusive;
+    private Boolean autoDelete;
     private String routingKey;
     private boolean logFailuresOnly = false;
     private String exchange = "";
@@ -195,6 +198,30 @@ public class RabbitmqConfig implements Serializable {
                             "Whether the messages received are supplied with a 
unique"
                                     + "id to deduplicate messages (in case of 
failed acknowledgments).");
 
+    public static final Option<Boolean> DURABLE =
+            Options.key("durable")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "true: The queue will survive a server restart."
+                                    + "false: The queue will be deleted on 
server restart.");
+
+    public static final Option<Boolean> EXCLUSIVE =
+            Options.key("exclusive")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "true: The queue is used only by the current 
connection and will be deleted when the connection closes."
+                                    + "false: The queue can be used by 
multiple connections.");
+
+    public static final Option<Boolean> AUTO_DELETE =
+            Options.key("auto_delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "true: The queue will be deleted automatically 
when the last consumer unsubscribes."
+                                    + "false: The queue will not be 
automatically deleted.");
+
     private void parseSinkOptionProperties(Config pluginConfig) {
         if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) 
{
             pluginConfig
@@ -259,6 +286,15 @@ public class RabbitmqConfig implements Serializable {
         if (config.hasPath(USE_CORRELATION_ID.key())) {
             this.usesCorrelationId = 
config.getBoolean(USE_CORRELATION_ID.key());
         }
+        if (config.hasPath(DURABLE.key())) {
+            this.durable = config.getBoolean(DURABLE.key());
+        }
+        if (config.hasPath(EXCLUSIVE.key())) {
+            this.exclusive = config.getBoolean(EXCLUSIVE.key());
+        }
+        if (config.hasPath(AUTO_DELETE.key())) {
+            this.autoDelete = config.getBoolean(AUTO_DELETE.key());
+        }
         parseSinkOptionProperties(config);
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
index 7052aa9bef..a846949d85 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
@@ -75,6 +75,9 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
     private static final String SINK_QUEUE_NAME = "test1";
     private static final String USERNAME = "guest";
     private static final String PASSWORD = "guest";
+    private static final Boolean DURABLE = true;
+    private static final Boolean EXCLUSIVE = false;
+    private static final Boolean AUTO_DELETE = false;
 
     private static final Pair<SeaTunnelRowType, List<SeaTunnelRow>> 
TEST_DATASET =
             generateTestDataSet();
@@ -185,6 +188,9 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
             config.setVirtualHost("/");
             config.setUsername(USERNAME);
             config.setPassword(PASSWORD);
+            config.setDurable(DURABLE);
+            config.setExclusive(EXCLUSIVE);
+            config.setAutoDelete(AUTO_DELETE);
             rabbitmqClient = new RabbitmqClient(config);
         } catch (Exception e) {
             throw new RuntimeException("init Rabbitmq error", e);
@@ -201,6 +207,9 @@ public class RabbitmqIT extends TestSuiteBase implements 
TestResource {
             config.setVirtualHost("/");
             config.setUsername(USERNAME);
             config.setPassword(PASSWORD);
+            config.setDurable(DURABLE);
+            config.setExclusive(EXCLUSIVE);
+            config.setAutoDelete(AUTO_DELETE);
             return new RabbitmqClient(config);
         } catch (Exception e) {
             throw new RuntimeException("init Rabbitmq error", e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
index b3a834bdc2..61267a3adc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf
@@ -28,6 +28,9 @@ source {
     username = "guest"
     password = "guest"
     queue_name = "test"
+    durable = "true"
+    exclusive = "false"
+    auto_delete = "false"
     for_e2e_testing = true
     schema = {
       fields {
@@ -61,6 +64,9 @@ sink {
     virtual_host = "/"
     username = "guest"
     password = "guest"
+    durable = "true"
+    exclusive = "false"
+    auto_delete = "false"
     queue_name = "test1"
   }
 }
\ No newline at end of file

Reply via email to