This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5f9492e62a [Fix][connector-rabbitmq] Set default value for durable,
exclusive and auto-delete (#9631)
5f9492e62a is described below
commit 5f9492e62a72d0f6e4a03b33c7e1da6f5057d33e
Author: ZHANG YINGHONG <[email protected]>
AuthorDate: Wed Jul 30 10:03:02 2025 +0800
[Fix][connector-rabbitmq] Set default value for durable, exclusive and
auto-delete (#9631)
---
.dlc.json | 1 +
docs/en/connector-v2/sink/Rabbitmq.md | 19 +++++++
docs/en/connector-v2/source/Rabbitmq.md | 22 +++++++-
.../seatunnel/rabbitmq/config/RabbitmqConfig.java | 12 +---
.../e2e/connector/rabbitmq/RabbitmqIT.java | 63 +++++++++++----------
.../rabbitmq-to-rabbitmq-using-default-config.conf | 66 ++++++++++++++++++++++
6 files changed, 143 insertions(+), 40 deletions(-)
diff --git a/.dlc.json b/.dlc.json
index 42e7bd5edf..35a04e1d21 100644
--- a/.dlc.json
+++ b/.dlc.json
@@ -39,3 +39,4 @@
403
]
}
+
diff --git a/docs/en/connector-v2/sink/Rabbitmq.md
b/docs/en/connector-v2/sink/Rabbitmq.md
index 7e7fb5ef59..b71ab415ed 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -30,6 +30,9 @@ Used to write data to Rabbitmq.
| connection_timeout | int | no | - |
| rabbitmq.config | map | no | - |
| common-options | | no | - |
+| durable | boolean | no | true |
+| exclusive | boolean | no | false |
+| auto_delete | boolean | no | false |
### host [string]
@@ -108,6 +111,22 @@ In addition to the above parameters that must be specified
by the RabbitMQ clien
Sink plugin common parameters, please refer to [Sink Common
Options](../sink-common-options.md) for details
+### durable
+
+- true: The queue will survive on server restart.
+- false: The queue will be deleted on server restart.
+
+### exclusive
+
+- true: The queue is used only by the current connection and will be deleted
when the connection closes.
+- false: The queue can be used by multiple connections.
+
+### auto-delete
+
+- true: The queue will be deleted automatically when the last consumer
unsubscribes.
+- false: The queue will not be automatically deleted.
+
+
## Example
simple:
diff --git a/docs/en/connector-v2/source/Rabbitmq.md
b/docs/en/connector-v2/source/Rabbitmq.md
index bce43b430a..068daee7cd 100644
--- a/docs/en/connector-v2/source/Rabbitmq.md
+++ b/docs/en/connector-v2/source/Rabbitmq.md
@@ -25,8 +25,8 @@ The source must be non-parallel (parallelism set to 1) in
order to achieve exact
## Options
-| name | type | required | default value |
-|----------------------------|---------|----------|---------------|
+| name | type | required | default value |
+| -------------------------- | ------- | -------- | ------------- |
| host | string | yes | - |
| port | int | yes | - |
| virtual_host | string | yes | - |
@@ -47,6 +47,9 @@ The source must be non-parallel (parallelism set to 1) in
order to achieve exact
| prefetch_count | int | no | - |
| delivery_timeout | long | no | - |
| common-options | | no | - |
+| durable | boolean | no | true |
+| exclusive | boolean | no | false |
+| auto_delete | boolean | no | false |
### host [string]
@@ -132,6 +135,21 @@ deliveryTimeout maximum wait time, in milliseconds, for
the next message deliver
Source plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details
+### durable
+
+- true: The queue will survive on server restart.
+- false: The queue will be deleted on server restart.
+
+### exclusive
+
+- true: The queue is used only by the current connection and will be deleted
when the connection closes.
+- false: The queue can be used by multiple connections.
+
+### auto-delete
+
+- true: The queue will be deleted automatically when the last consumer
unsubscribes.
+- false: The queue will not be automatically deleted.
+
## Example
simple:
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index ba67ed3c69..8ede2a6b91 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -114,15 +114,9 @@ public class RabbitmqConfig implements Serializable {
if
(config.getOptional(RabbitmqSourceOptions.USE_CORRELATION_ID).isPresent()) {
this.usesCorrelationId =
config.get(RabbitmqSourceOptions.USE_CORRELATION_ID);
}
- if (config.getOptional(RabbitmqBaseOptions.DURABLE).isPresent()) {
- this.durable = config.get(RabbitmqBaseOptions.DURABLE);
- }
- if (config.getOptional(RabbitmqBaseOptions.EXCLUSIVE).isPresent()) {
- this.exclusive = config.get(RabbitmqBaseOptions.EXCLUSIVE);
- }
- if (config.getOptional(RabbitmqBaseOptions.AUTO_DELETE).isPresent()) {
- this.autoDelete = config.get(RabbitmqBaseOptions.AUTO_DELETE);
- }
+ this.durable = config.get(RabbitmqBaseOptions.DURABLE);
+ this.exclusive = config.get(RabbitmqBaseOptions.EXCLUSIVE);
+ this.autoDelete = config.get(RabbitmqBaseOptions.AUTO_DELETE);
this.sinkOptionProps = config.get(RabbitmqSinkOptions.RABBITMQ_CONFIG);
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
index a846949d85..246bb05d76 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java
@@ -71,8 +71,6 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
private static final String IMAGE = "rabbitmq:3-management";
private static final String HOST = "rabbitmq-e2e";
private static final int PORT = 5672;
- private static final String QUEUE_NAME = "test";
- private static final String SINK_QUEUE_NAME = "test1";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final Boolean DURABLE = true;
@@ -86,7 +84,6 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
private GenericContainer<?> rabbitmqContainer;
Connection connection;
- RabbitmqClient rabbitmqClient;
@BeforeAll
@Override
@@ -102,10 +99,10 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
.withStartupTimeout(Duration.ofMinutes(2)));
Startables.deepStart(Stream.of(rabbitmqContainer)).join();
log.info("rabbitmq container started");
- this.initRabbitMQ();
}
- private void initSourceData() throws IOException, InterruptedException {
+ private void initSourceData(RabbitmqClient rabbitmqClient)
+ throws IOException, InterruptedException {
List<SeaTunnelRow> rows = TEST_DATASET.getValue();
for (int i = 0; i < rows.size(); i++) {
rabbitmqClient.write(
@@ -179,31 +176,12 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
return Pair.of(rowType, rows);
}
- private void initRabbitMQ() {
+ private RabbitmqClient getRabbitmqClient(String queueName) {
try {
RabbitmqConfig config = new RabbitmqConfig();
config.setHost(rabbitmqContainer.getHost());
config.setPort(rabbitmqContainer.getFirstMappedPort());
- config.setQueueName(QUEUE_NAME);
- config.setVirtualHost("/");
- config.setUsername(USERNAME);
- config.setPassword(PASSWORD);
- config.setDurable(DURABLE);
- config.setExclusive(EXCLUSIVE);
- config.setAutoDelete(AUTO_DELETE);
- rabbitmqClient = new RabbitmqClient(config);
- } catch (Exception e) {
- throw new RuntimeException("init Rabbitmq error", e);
- }
- }
-
- private RabbitmqClient initSinkRabbitMQ() {
-
- try {
- RabbitmqConfig config = new RabbitmqConfig();
- config.setHost(rabbitmqContainer.getHost());
- config.setPort(rabbitmqContainer.getFirstMappedPort());
- config.setQueueName(SINK_QUEUE_NAME);
+ config.setQueueName(queueName);
config.setVirtualHost("/");
config.setUsername(USERNAME);
config.setPassword(PASSWORD);
@@ -227,16 +205,19 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testRabbitMQ(TestContainer container) throws Exception {
+ final String sourceQueueName = "test";
+ final String sinkQueueName = "test1";
+ RabbitmqClient sourceClient = this.getRabbitmqClient(sourceQueueName);
// send data to source queue before executeJob start in every
testContainer
- initSourceData();
+ initSourceData(sourceClient);
// init consumer client before executeJob start in every testContainer
- RabbitmqClient sinkRabbitmqClient = initSinkRabbitMQ();
+ RabbitmqClient sinkRabbitmqClient = getRabbitmqClient(sinkQueueName);
Set<String> resultSet = new HashSet<>();
Handover handover = new Handover<>();
DefaultConsumer consumer =
sinkRabbitmqClient.getQueueingConsumer(handover);
- sinkRabbitmqClient.getChannel().basicConsume(SINK_QUEUE_NAME, true,
consumer);
+ sinkRabbitmqClient.getChannel().basicConsume(sinkQueueName, true,
consumer);
// assert execute Job code
Container.ExecResult execResult =
container.executeJob("/rabbitmq-to-rabbitmq.conf");
Assertions.assertEquals(0, execResult.getExitCode());
@@ -263,4 +244,28 @@ public class RabbitmqIT extends TestSuiteBase implements
TestResource {
JSON_SERIALIZATION_SCHEMA.serialize(
TEST_DATASET.getValue().get(1)))));
}
+
+ @TestTemplate
+ public void testRabbitMQUSingDefaultConfig(TestContainer container) throws
Exception {
+ final String sourceQueueName = "test2_0";
+ final String sinkQueueName = "test2_1";
+ RabbitmqClient sourceClient = this.getRabbitmqClient(sourceQueueName);
+ // send data to source queue before executeJob start in every
testContainer
+ initSourceData(sourceClient);
+
+ // init consumer client before executeJob start in every testContainer
+ RabbitmqClient sinkRabbitmqClient = getRabbitmqClient(sinkQueueName);
+
+ Handover handover = new Handover<>();
+ DefaultConsumer consumer =
sinkRabbitmqClient.getQueueingConsumer(handover);
+ sinkRabbitmqClient.getChannel().basicConsume(sinkQueueName, true,
consumer);
+ // assert execute Job code
+ Container.ExecResult execResult = null;
+ try {
+ execResult =
container.executeJob("/rabbitmq-to-rabbitmq-using-default-config.conf");
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq-using-default-config.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq-using-default-config.conf
new file mode 100644
index 0000000000..d91a6d5e6d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq-using-default-config.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ RabbitMQ {
+ host = "rabbitmq-e2e"
+ port = 5672
+ virtual_host = "/"
+ username = "guest"
+ password = "guest"
+ queue_name = "test2_0"
+ for_e2e_testing = true
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ RabbitMQ {
+ host = "rabbitmq-e2e"
+ port = 5672
+ virtual_host = "/"
+ username = "guest"
+ password = "guest"
+ queue_name = "test2_1"
+ }
+}
\ No newline at end of file