This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4eec9be012 [Improve] rabbit mq options (#8740)
4eec9be012 is described below

commit 4eec9be012a685fa2be13590c5c967b7cd6eef3b
Author: Jarvis <jar...@apache.org>
AuthorDate: Wed Feb 19 21:42:57 2025 +0800

    [Improve] rabbit mq options (#8740)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |   2 -
 .../rabbitmq/config/RabbitmqBaseOptions.java       | 135 ++++++++++
 .../seatunnel/rabbitmq/config/RabbitmqConfig.java  | 271 ++++-----------------
 .../rabbitmq/config/RabbitmqSinkOptions.java       |  35 +++
 .../rabbitmq/config/RabbitmqSourceOptions.java     |  63 +++++
 .../seatunnel/rabbitmq/sink/RabbitmqSink.java      |  59 +----
 .../rabbitmq/sink/RabbitmqSinkFactory.java         |  54 ++--
 .../seatunnel/rabbitmq/source/RabbitmqSource.java  |  62 +----
 .../rabbitmq/source/RabbitmqSourceFactory.java     |  74 +++---
 9 files changed, 371 insertions(+), 384 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 7dce5ee5f5..533c622602 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -171,7 +171,6 @@ public class ConnectorOptionCheckTest {
         Set<String> whiteList = new HashSet<>();
         whiteList.add("JdbcSinkOptions");
         whiteList.add("TypesenseSourceOptions");
-        whiteList.add("RabbitmqSourceOptions");
         whiteList.add("TypesenseSinkOptions");
         whiteList.add("EmailSinkOptions");
         whiteList.add("HudiSinkOptions");
@@ -188,7 +187,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("MongodbSinkOptions");
         whiteList.add("IoTDBSinkOptions");
         whiteList.add("EasysearchSourceOptions");
-        whiteList.add("RabbitmqSinkOptions");
         whiteList.add("IcebergSourceOptions");
         whiteList.add("HbaseSourceOptions");
         whiteList.add("PaimonSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java
new file mode 100644
index 0000000000..ff71f860e3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqBaseOptions.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+
+public class RabbitmqBaseOptions extends ConnectorCommonOptions {
+
+    public static final Option<String> HOST =
+            Options.key("host")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the default host to use for 
connections");
+
+    public static final Option<Integer> PORT =
+            Options.key("port")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("the default port to use for 
connections");
+
+    public static final Option<String> VIRTUAL_HOST =
+            Options.key("virtual_host")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the virtual host to use when connecting 
to the broker");
+
+    public static final Option<String> QUEUE_NAME =
+            Options.key("queue_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the queue to write the message to");
+
+    public static final Option<String> USERNAME =
+            Options.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the AMQP user name to use when 
connecting to the broker");
+
+    public static final Option<String> PASSWORD =
+            Options.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the password to use when connecting to 
the broker");
+
+    public static final Option<String> URL =
+            Options.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "convenience method for setting the fields in an 
AMQP URI: host, port, username, password and virtual host");
+
+    public static final Option<String> ROUTING_KEY =
+            Options.key("routing_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the routing key to publish the message 
to");
+
+    public static final Option<String> EXCHANGE =
+            Options.key("exchange")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the exchange to publish the message to");
+
+    public static final Option<Integer> NETWORK_RECOVERY_INTERVAL =
+            Options.key("network_recovery_interval")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "how long will automatic recovery wait before 
attempting to reconnect, in ms");
+
+    public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED =
+            Options.key("topology_recovery_enabled")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, enables topology recovery");
+
+    public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED =
+            Options.key("AUTOMATIC_RECOVERY_ENABLED")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("if true, enables connection recovery");
+
+    public static final Option<Integer> CONNECTION_TIMEOUT =
+            Options.key("connection_timeout")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("connection TCP establishment timeout in 
milliseconds");
+
+    public static final Option<Boolean> FOR_E2E_TESTING =
+            Options.key("for_e2e_testing")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription("use to recognize E2E mode");
+
+    public static final Option<Boolean> DURABLE =
+            Options.key("durable")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "true: The queue will survive a server restart."
+                                    + "false: The queue will be deleted on 
server restart.");
+
+    public static final Option<Boolean> EXCLUSIVE =
+            Options.key("exclusive")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "true: The queue is used only by the current 
connection and will be deleted when the connection closes."
+                                    + "false: The queue can be used by 
multiple connections.");
+
+    public static final Option<Boolean> AUTO_DELETE =
+            Options.key("auto_delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "true: The queue will be deleted automatically 
when the last consumer unsubscribes."
+                                    + "false: The queue will not be 
automatically deleted.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index 2872c9af81..ba67ed3c69 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -17,24 +17,20 @@
 
 package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
 
-import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 @Setter
 @Getter
+@NoArgsConstructor
 @AllArgsConstructor
 public class RabbitmqConfig implements Serializable {
     private String host;
@@ -63,241 +59,70 @@ public class RabbitmqConfig implements Serializable {
     private boolean forE2ETesting = false;
     private boolean usesCorrelationId = false;
 
-    private final Map<String, Object> sinkOptionProps = new HashMap<>();
-
-    public static final Option<String> HOST =
-            Options.key("host")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the default host to use for 
connections");
-
-    public static final Option<Integer> PORT =
-            Options.key("port")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("the default port to use for 
connections");
-
-    public static final Option<String> VIRTUAL_HOST =
-            Options.key("virtual_host")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the virtual host to use when connecting 
to the broker");
-
-    public static final Option<String> USERNAME =
-            Options.key("username")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the AMQP user name to use when 
connecting to the broker");
-
-    public static final Option<String> PASSWORD =
-            Options.key("password")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the password to use when connecting to 
the broker");
-
-    public static final Option<String> QUEUE_NAME =
-            Options.key("queue_name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the queue to write the message to");
-
-    public static final Option<String> URL =
-            Options.key("url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "convenience method for setting the fields in an 
AMQP URI: host, port, username, password and virtual host");
-
-    public static final Option<Integer> NETWORK_RECOVERY_INTERVAL =
-            Options.key("network_recovery_interval")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "how long will automatic recovery wait before 
attempting to reconnect, in ms");
-
-    public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED =
-            Options.key("AUTOMATIC_RECOVERY_ENABLED")
-                    .booleanType()
-                    .noDefaultValue()
-                    .withDescription("if true, enables connection recovery");
-
-    public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED =
-            Options.key("topology_recovery_enabled")
-                    .booleanType()
-                    .noDefaultValue()
-                    .withDescription("if true, enables topology recovery");
-
-    public static final Option<Integer> CONNECTION_TIMEOUT =
-            Options.key("connection_timeout")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("connection TCP establishment timeout in 
milliseconds");
-
-    public static final Option<Integer> REQUESTED_CHANNEL_MAX =
-            Options.key("requested_channel_max")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("initially requested maximum channel 
number");
-
-    public static final Option<Integer> REQUESTED_FRAME_MAX =
-            Options.key("requested_frame_max")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("the requested maximum frame size");
-
-    public static final Option<Integer> REQUESTED_HEARTBEAT =
-            Options.key("requested_heartbeat")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("the requested heartbeat timeout");
+    private Map<String, String> sinkOptionProps = new HashMap<>();
 
-    public static final Option<Long> PREFETCH_COUNT =
-            Options.key("prefetch_count")
-                    .longType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "prefetchCount the max number of messages to 
receive without acknowledgement\n");
-
-    public static final Option<Integer> DELIVERY_TIMEOUT =
-            Options.key("delivery_timeout")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("deliveryTimeout maximum wait time");
-
-    public static final Option<String> ROUTING_KEY =
-            Options.key("routing_key")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the routing key to publish the message 
to");
-
-    public static final Option<String> EXCHANGE =
-            Options.key("exchange")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the exchange to publish the message to");
-
-    public static final Option<Boolean> FOR_E2E_TESTING =
-            Options.key("for_e2e_testing")
-                    .booleanType()
-                    .noDefaultValue()
-                    .withDescription("use to recognize E2E mode");
-
-    public static final Option<Map<String, String>> RABBITMQ_CONFIG =
-            Options.key("rabbitmq.config")
-                    .mapType()
-                    .defaultValue(Collections.emptyMap())
-                    .withDescription(
-                            "In addition to the above parameters that must be 
specified by the RabbitMQ client, the user can also specify multiple 
non-mandatory parameters for the client, "
-                                    + "covering [all the parameters specified 
in the official RabbitMQ document](https://www.rabbitmq.com/configure.html).");
-
-    public static final Option<Boolean> USE_CORRELATION_ID =
-            Options.key("use_correlation_id")
-                    .booleanType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Whether the messages received are supplied with a 
unique"
-                                    + "id to deduplicate messages (in case of 
failed acknowledgments).");
-
-    public static final Option<Boolean> DURABLE =
-            Options.key("durable")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription(
-                            "true: The queue will survive a server restart."
-                                    + "false: The queue will be deleted on 
server restart.");
-
-    public static final Option<Boolean> EXCLUSIVE =
-            Options.key("exclusive")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "true: The queue is used only by the current 
connection and will be deleted when the connection closes."
-                                    + "false: The queue can be used by 
multiple connections.");
-
-    public static final Option<Boolean> AUTO_DELETE =
-            Options.key("auto_delete")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "true: The queue will be deleted automatically 
when the last consumer unsubscribes."
-                                    + "false: The queue will not be 
automatically deleted.");
-
-    private void parseSinkOptionProperties(Config pluginConfig) {
-        if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) 
{
-            pluginConfig
-                    .getObject(RABBITMQ_CONFIG.key())
-                    .forEach(
-                            (key, value) -> {
-                                final String configKey = key.toLowerCase();
-                                this.sinkOptionProps.put(configKey, 
value.unwrapped());
-                            });
+    public RabbitmqConfig(ReadonlyConfig config) {
+        this.host = config.get(RabbitmqBaseOptions.HOST);
+        this.port = config.get(RabbitmqBaseOptions.PORT);
+        this.queueName = config.get(RabbitmqBaseOptions.QUEUE_NAME);
+        if (config.getOptional(RabbitmqBaseOptions.USERNAME).isPresent()) {
+            this.username = config.get(RabbitmqBaseOptions.USERNAME);
         }
-    }
-
-    public RabbitmqConfig(Config config) {
-        this.host = config.getString(HOST.key());
-        this.port = config.getInt(PORT.key());
-        this.queueName = config.getString(QUEUE_NAME.key());
-        if (config.hasPath(USERNAME.key())) {
-            this.username = config.getString(USERNAME.key());
-        }
-        if (config.hasPath(PASSWORD.key())) {
-            this.password = config.getString(PASSWORD.key());
+        if (config.getOptional(RabbitmqBaseOptions.PASSWORD).isPresent()) {
+            this.password = config.get(RabbitmqBaseOptions.PASSWORD);
         }
-        if (config.hasPath(VIRTUAL_HOST.key())) {
-            this.virtualHost = config.getString(VIRTUAL_HOST.key());
+        if (config.getOptional(RabbitmqBaseOptions.VIRTUAL_HOST).isPresent()) {
+            this.virtualHost = config.get(RabbitmqBaseOptions.VIRTUAL_HOST);
         }
-        if (config.hasPath(NETWORK_RECOVERY_INTERVAL.key())) {
-            this.networkRecoveryInterval = 
config.getInt(NETWORK_RECOVERY_INTERVAL.key());
+        if 
(config.getOptional(RabbitmqBaseOptions.NETWORK_RECOVERY_INTERVAL).isPresent()) 
{
+            this.networkRecoveryInterval =
+                    config.get(RabbitmqBaseOptions.NETWORK_RECOVERY_INTERVAL);
         }
-        if (config.hasPath(AUTOMATIC_RECOVERY_ENABLED.key())) {
-            this.automaticRecovery = 
config.getBoolean(AUTOMATIC_RECOVERY_ENABLED.key());
+        if 
(config.getOptional(RabbitmqBaseOptions.AUTOMATIC_RECOVERY_ENABLED).isPresent())
 {
+            this.automaticRecovery = 
config.get(RabbitmqBaseOptions.AUTOMATIC_RECOVERY_ENABLED);
         }
-        if (config.hasPath(TOPOLOGY_RECOVERY_ENABLED.key())) {
-            this.topologyRecovery = 
config.getBoolean(TOPOLOGY_RECOVERY_ENABLED.key());
+        if 
(config.getOptional(RabbitmqBaseOptions.TOPOLOGY_RECOVERY_ENABLED).isPresent()) 
{
+            this.topologyRecovery = 
config.get(RabbitmqBaseOptions.TOPOLOGY_RECOVERY_ENABLED);
         }
-        if (config.hasPath(CONNECTION_TIMEOUT.key())) {
-            this.connectionTimeout = config.getInt(CONNECTION_TIMEOUT.key());
+        if 
(config.getOptional(RabbitmqBaseOptions.CONNECTION_TIMEOUT).isPresent()) {
+            this.connectionTimeout = 
config.get(RabbitmqBaseOptions.CONNECTION_TIMEOUT);
         }
-        if (config.hasPath(REQUESTED_CHANNEL_MAX.key())) {
-            this.requestedChannelMax = 
config.getInt(REQUESTED_CHANNEL_MAX.key());
+        if 
(config.getOptional(RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX).isPresent()) {
+            this.requestedChannelMax = 
config.get(RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX);
         }
-        if (config.hasPath(REQUESTED_FRAME_MAX.key())) {
-            this.requestedFrameMax = config.getInt(REQUESTED_FRAME_MAX.key());
+        if 
(config.getOptional(RabbitmqSourceOptions.REQUESTED_FRAME_MAX).isPresent()) {
+            this.requestedFrameMax = 
config.get(RabbitmqSourceOptions.REQUESTED_FRAME_MAX);
         }
-        if (config.hasPath(REQUESTED_HEARTBEAT.key())) {
-            this.requestedHeartbeat = config.getInt(REQUESTED_HEARTBEAT.key());
+        if 
(config.getOptional(RabbitmqSourceOptions.REQUESTED_HEARTBEAT).isPresent()) {
+            this.requestedHeartbeat = 
config.get(RabbitmqSourceOptions.REQUESTED_HEARTBEAT);
         }
-        if (config.hasPath(PREFETCH_COUNT.key())) {
-            this.prefetchCount = config.getInt(PREFETCH_COUNT.key());
+        if 
(config.getOptional(RabbitmqSourceOptions.PREFETCH_COUNT).isPresent()) {
+            this.prefetchCount = 
config.get(RabbitmqSourceOptions.PREFETCH_COUNT);
         }
-        if (config.hasPath(DELIVERY_TIMEOUT.key())) {
-            this.deliveryTimeout = config.getInt(DELIVERY_TIMEOUT.key());
+        if 
(config.getOptional(RabbitmqSourceOptions.DELIVERY_TIMEOUT).isPresent()) {
+            this.deliveryTimeout = 
config.get(RabbitmqSourceOptions.DELIVERY_TIMEOUT);
         }
-        if (config.hasPath(ROUTING_KEY.key())) {
-            this.routingKey = config.getString(ROUTING_KEY.key());
+        if (config.getOptional(RabbitmqBaseOptions.ROUTING_KEY).isPresent()) {
+            this.routingKey = config.get(RabbitmqBaseOptions.ROUTING_KEY);
         }
-        if (config.hasPath(EXCHANGE.key())) {
-            this.exchange = config.getString(EXCHANGE.key());
+        if (config.getOptional(RabbitmqBaseOptions.EXCHANGE).isPresent()) {
+            this.exchange = config.get(RabbitmqBaseOptions.EXCHANGE);
         }
-        if (config.hasPath(FOR_E2E_TESTING.key())) {
-            this.forE2ETesting = config.getBoolean(FOR_E2E_TESTING.key());
+        if 
(config.getOptional(RabbitmqBaseOptions.FOR_E2E_TESTING).isPresent()) {
+            this.forE2ETesting = 
config.get(RabbitmqBaseOptions.FOR_E2E_TESTING);
         }
-        if (config.hasPath(USE_CORRELATION_ID.key())) {
-            this.usesCorrelationId = 
config.getBoolean(USE_CORRELATION_ID.key());
+        if 
(config.getOptional(RabbitmqSourceOptions.USE_CORRELATION_ID).isPresent()) {
+            this.usesCorrelationId = 
config.get(RabbitmqSourceOptions.USE_CORRELATION_ID);
         }
-        if (config.hasPath(DURABLE.key())) {
-            this.durable = config.getBoolean(DURABLE.key());
+        if (config.getOptional(RabbitmqBaseOptions.DURABLE).isPresent()) {
+            this.durable = config.get(RabbitmqBaseOptions.DURABLE);
         }
-        if (config.hasPath(EXCLUSIVE.key())) {
-            this.exclusive = config.getBoolean(EXCLUSIVE.key());
+        if (config.getOptional(RabbitmqBaseOptions.EXCLUSIVE).isPresent()) {
+            this.exclusive = config.get(RabbitmqBaseOptions.EXCLUSIVE);
         }
-        if (config.hasPath(AUTO_DELETE.key())) {
-            this.autoDelete = config.getBoolean(AUTO_DELETE.key());
+        if (config.getOptional(RabbitmqBaseOptions.AUTO_DELETE).isPresent()) {
+            this.autoDelete = config.get(RabbitmqBaseOptions.AUTO_DELETE);
         }
-        parseSinkOptionProperties(config);
+        this.sinkOptionProps = config.get(RabbitmqSinkOptions.RABBITMQ_CONFIG);
     }
-
-    @VisibleForTesting
-    public RabbitmqConfig() {}
 }
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java
new file mode 100644
index 0000000000..cfd602596f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSinkOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class RabbitmqSinkOptions extends RabbitmqBaseOptions {
+
+    public static final Option<Map<String, String>> RABBITMQ_CONFIG =
+            Options.key("rabbitmq.config")
+                    .mapType()
+                    .defaultValue(Collections.emptyMap())
+                    .withDescription(
+                            "In addition to the above parameters that must be 
specified by the RabbitMQ client, the user can also specify multiple 
non-mandatory parameters for the client, "
+                                    + "covering [all the parameters specified 
in the official RabbitMQ document](https://www.rabbitmq.com/configure.html).");
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java
new file mode 100644
index 0000000000..dcc72f2067
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqSourceOptions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class RabbitmqSourceOptions extends RabbitmqBaseOptions {
+
+    public static final Option<Integer> REQUESTED_CHANNEL_MAX =
+            Options.key("requested_channel_max")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("initially requested maximum channel 
number");
+
+    public static final Option<Integer> REQUESTED_FRAME_MAX =
+            Options.key("requested_frame_max")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("the requested maximum frame size");
+
+    public static final Option<Integer> REQUESTED_HEARTBEAT =
+            Options.key("requested_heartbeat")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("the requested heartbeat timeout");
+
+    public static final Option<Integer> PREFETCH_COUNT =
+            Options.key("prefetch_count")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "prefetchCount the max number of messages to 
receive without acknowledgement\n");
+
+    public static final Option<Integer> DELIVERY_TIMEOUT =
+            Options.key("delivery_timeout")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("deliveryTimeout maximum wait time");
+
+    public static final Option<Boolean> USE_CORRELATION_ID =
+            Options.key("use_correlation_id")
+                    .booleanType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Whether the messages received are supplied with a 
unique"
+                                    + "id to deduplicate messages (in case of 
failed acknowledgments).");
+}
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
index 7d4f26272b..bfcedcc340 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSink.java
@@ -17,82 +17,39 @@
 
 package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
-
-import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
-
-@AutoService(SeaTunnelSink.class)
 public class RabbitmqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-    private SeaTunnelRowType seaTunnelRowType;
-    private Config pluginConfig;
-    private RabbitmqConfig rabbitMQConfig;
+
+    private final RabbitmqConfig rabbitMQConfig;
+    private final CatalogTable catalogTable;
 
     @Override
     public String getPluginName() {
         return "RabbitMQ";
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        HOST.key(),
-                        PORT.key(),
-                        VIRTUAL_HOST.key(),
-                        USERNAME.key(),
-                        PASSWORD.key(),
-                        QUEUE_NAME.key());
-        if (!result.isSuccess()) {
-            throw new RabbitmqConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        rabbitMQConfig = new RabbitmqConfig(pluginConfig);
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public RabbitmqSink(RabbitmqConfig rabbitMQConfig, CatalogTable 
catalogTable) {
+        this.rabbitMQConfig = rabbitMQConfig;
+        this.catalogTable = catalogTable;
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new RabbitmqSinkWriter(rabbitMQConfig, seaTunnelRowType);
+        return new RabbitmqSinkWriter(rabbitMQConfig, 
catalogTable.getSeaTunnelRowType());
     }
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
index 4618d351d9..540707d0f0 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
@@ -18,26 +18,15 @@
 package org.apache.seatunnel.connectors.seatunnel.rabbitmq.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSinkOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.RABBITMQ_CONFIG;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
-
 @AutoService(Factory.class)
 public class RabbitmqSinkFactory implements TableSinkFactory {
 
@@ -49,17 +38,32 @@ public class RabbitmqSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(HOST, PORT, VIRTUAL_HOST, QUEUE_NAME)
-                .bundled(USERNAME, PASSWORD)
+                .required(
+                        RabbitmqSinkOptions.HOST,
+                        RabbitmqSinkOptions.PORT,
+                        RabbitmqSinkOptions.VIRTUAL_HOST,
+                        RabbitmqSinkOptions.QUEUE_NAME)
+                .bundled(RabbitmqSinkOptions.USERNAME, 
RabbitmqSinkOptions.PASSWORD)
                 .optional(
-                        URL,
-                        ROUTING_KEY,
-                        EXCHANGE,
-                        NETWORK_RECOVERY_INTERVAL,
-                        TOPOLOGY_RECOVERY_ENABLED,
-                        AUTOMATIC_RECOVERY_ENABLED,
-                        CONNECTION_TIMEOUT,
-                        RABBITMQ_CONFIG)
+                        RabbitmqSinkOptions.URL,
+                        RabbitmqSinkOptions.ROUTING_KEY,
+                        RabbitmqSinkOptions.EXCHANGE,
+                        RabbitmqSinkOptions.NETWORK_RECOVERY_INTERVAL,
+                        RabbitmqSinkOptions.TOPOLOGY_RECOVERY_ENABLED,
+                        RabbitmqSinkOptions.AUTOMATIC_RECOVERY_ENABLED,
+                        RabbitmqSinkOptions.CONNECTION_TIMEOUT,
+                        RabbitmqSinkOptions.FOR_E2E_TESTING,
+                        RabbitmqSinkOptions.DURABLE,
+                        RabbitmqSinkOptions.EXCLUSIVE,
+                        RabbitmqSinkOptions.AUTO_DELETE,
+                        RabbitmqSinkOptions.RABBITMQ_CONFIG)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () ->
+                new RabbitmqSink(
+                        new RabbitmqConfig(context.getOptions()), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
index 01146f26ac..afe108f229 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSource.java
@@ -17,12 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -30,11 +26,7 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
@@ -43,23 +35,23 @@ import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;
 import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplitEnumeratorState;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
-import com.google.auto.service.AutoService;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
+import java.util.Collections;
+import java.util.List;
 
-@AutoService(SeaTunnelSource.class)
 public class RabbitmqSource
         implements SeaTunnelSource<SeaTunnelRow, RabbitmqSplit, 
RabbitmqSplitEnumeratorState>,
                 SupportParallelism {
 
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
     private JobContext jobContext;
-    private RabbitmqConfig rabbitMQConfig;
+    private final RabbitmqConfig rabbitMQConfig;
+    private final CatalogTable catalogTable;
+
+    public RabbitmqSource(RabbitmqConfig rabbitMQConfig, CatalogTable 
catalogTable) {
+        this.rabbitMQConfig = rabbitMQConfig;
+        this.catalogTable = catalogTable;
+        this.deserializationSchema = new 
JsonDeserializationSchema(catalogTable, false, false);
+    }
 
     @Override
     public Boundedness getBoundedness() {
@@ -79,31 +71,8 @@ public class RabbitmqSource
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        config,
-                        HOST.key(),
-                        PORT.key(),
-                        VIRTUAL_HOST.key(),
-                        USERNAME.key(),
-                        PASSWORD.key(),
-                        QUEUE_NAME.key(),
-                        ConnectorCommonOptions.SCHEMA.key());
-        if (!result.isSuccess()) {
-            throw new RabbitmqConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        this.rabbitMQConfig = new RabbitmqConfig(config);
-        setDeserialization(config);
-    }
-
-    @Override
-    public SeaTunnelDataType getProducedType() {
-        return deserializationSchema.getProducedType();
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
@@ -130,11 +99,4 @@ public class RabbitmqSource
     public void setJobContext(JobContext jobContext) {
         this.jobContext = jobContext;
     }
-
-    private void setDeserialization(Config config) {
-        // TODO: format SPI
-        // only support json deserializationSchema
-        CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config);
-        this.deserializationSchema = new 
JsonDeserializationSchema(catalogTable, false, false);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
index 212b194065..a45a2d11a1 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceFactory.java
@@ -18,31 +18,20 @@
 package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSinkOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqSourceOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.AUTOMATIC_RECOVERY_ENABLED;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.CONNECTION_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.DELIVERY_TIMEOUT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.EXCHANGE;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.NETWORK_RECOVERY_INTERVAL;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PREFETCH_COUNT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_CHANNEL_MAX;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_FRAME_MAX;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.REQUESTED_HEARTBEAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.USERNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.VIRTUAL_HOST;
+import java.io.Serializable;
 
 @AutoService(Factory.class)
 public class RabbitmqSourceFactory implements TableSourceFactory {
@@ -54,24 +43,43 @@ public class RabbitmqSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(HOST, PORT, VIRTUAL_HOST, QUEUE_NAME, 
ConnectorCommonOptions.SCHEMA)
-                .bundled(USERNAME, PASSWORD)
+                .required(
+                        RabbitmqSourceOptions.HOST,
+                        RabbitmqSourceOptions.PORT,
+                        RabbitmqSourceOptions.VIRTUAL_HOST,
+                        RabbitmqSourceOptions.QUEUE_NAME,
+                        RabbitmqSourceOptions.SCHEMA)
+                .bundled(RabbitmqSourceOptions.USERNAME, 
RabbitmqSourceOptions.PASSWORD)
                 .optional(
-                        URL,
-                        ROUTING_KEY,
-                        EXCHANGE,
-                        NETWORK_RECOVERY_INTERVAL,
-                        TOPOLOGY_RECOVERY_ENABLED,
-                        AUTOMATIC_RECOVERY_ENABLED,
-                        CONNECTION_TIMEOUT,
-                        REQUESTED_CHANNEL_MAX,
-                        REQUESTED_FRAME_MAX,
-                        REQUESTED_HEARTBEAT,
-                        PREFETCH_COUNT,
-                        DELIVERY_TIMEOUT)
+                        RabbitmqSourceOptions.URL,
+                        RabbitmqSourceOptions.ROUTING_KEY,
+                        RabbitmqSourceOptions.EXCHANGE,
+                        RabbitmqSourceOptions.NETWORK_RECOVERY_INTERVAL,
+                        RabbitmqSourceOptions.TOPOLOGY_RECOVERY_ENABLED,
+                        RabbitmqSourceOptions.AUTOMATIC_RECOVERY_ENABLED,
+                        RabbitmqSourceOptions.CONNECTION_TIMEOUT,
+                        RabbitmqSinkOptions.FOR_E2E_TESTING,
+                        RabbitmqSinkOptions.DURABLE,
+                        RabbitmqSinkOptions.EXCLUSIVE,
+                        RabbitmqSinkOptions.AUTO_DELETE,
+                        RabbitmqSourceOptions.REQUESTED_CHANNEL_MAX,
+                        RabbitmqSourceOptions.REQUESTED_FRAME_MAX,
+                        RabbitmqSourceOptions.REQUESTED_HEARTBEAT,
+                        RabbitmqSourceOptions.PREFETCH_COUNT,
+                        RabbitmqSourceOptions.DELIVERY_TIMEOUT)
                 .build();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new RabbitmqSource(
+                                new RabbitmqConfig(context.getOptions()),
+                                
CatalogTableUtil.buildWithConfig(context.getOptions()));
+    }
+
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return RabbitmqSource.class;

Reply via email to