This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1b44b9b440 [improve] selectdb options (#9252)
1b44b9b440 is described below

commit 1b44b9b440c7ec52e6f85fbf0a28f4a51c32b39a
Author: Jarvis <[email protected]>
AuthorDate: Mon Jul 7 16:24:51 2025 +0800

    [improve] selectdb options (#9252)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |   1 -
 .../connectors/selectdb/config/SelectDBConfig.java | 165 +++------------------
 ...electDBConfig.java => SelectDBSinkOptions.java} | 101 ++-----------
 .../connectors/selectdb/sink/SelectDBSink.java     |  69 +++------
 .../selectdb/sink/SelectDBSinkFactory.java         |  63 ++++++++
 .../selectdb/sink/committer/SelectDBCommitter.java |   7 +-
 .../selectdb/sink/writer/SelectDBSinkWriter.java   |   6 +-
 .../starter/seatunnel/SeaTunnelConnectorTest.java  |   7 +-
 8 files changed, 119 insertions(+), 300 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index d433102de5..21a64a0c25 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -191,7 +191,6 @@ public class ConnectorOptionCheckTest {
         Set<String> whiteList = new HashSet<>();
         whiteList.add("JdbcSinkOptions");
         whiteList.add("MongodbSinkOptions");
-        whiteList.add("SelectDBSinkOptions");
         whiteList.add("PostgresIncrementalSourceOptions");
         whiteList.add("SqlServerIncrementalSourceOptions");
         whiteList.add("OracleIncrementalSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
index 50c3442c6b..8d6ba4c5b6 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
@@ -17,105 +17,18 @@
 
 package org.apache.seatunnel.connectors.selectdb.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
-import java.util.Map;
 import java.util.Properties;
-import java.util.UUID;
 
 @Setter
 @Getter
 @ToString
 public class SelectDBConfig {
-    private static final int DEFAULT_SINK_MAX_RETRIES = 3;
-    private static final int DEFAULT_SINK_BUFFER_SIZE = 10 * 1024 * 1024;
-    private static final int DEFAULT_SINK_BUFFER_COUNT = 10000;
-    // common option
-    public static final Option<String> LOAD_URL =
-            Options.key("load-url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("SelectDB load http address.");
-    public static final Option<String> JDBC_URL =
-            Options.key("jdbc-url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("SelectDB jdbc query address.");
-    public static final Option<String> CLUSTER_NAME =
-            Options.key("cluster-name")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("SelectDB cluster name.");
-
-    public static final Option<String> TABLE_IDENTIFIER =
-            Options.key("table.identifier")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the jdbc table name.");
-    public static final Option<String> USERNAME =
-            Options.key("username")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the jdbc user name.");
-    public static final Option<String> PASSWORD =
-            Options.key("password")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("the jdbc password.");
-
-    public static final Option<Boolean> SINK_ENABLE_2PC =
-            Options.key("sink.enable-2pc")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription("enable 2PC while loading");
-    // sink config options
-    public static final Option<Integer> SINK_MAX_RETRIES =
-            Options.key("sink.max-retries")
-                    .intType()
-                    .defaultValue(DEFAULT_SINK_MAX_RETRIES)
-                    .withDescription("the max retry times if writing records 
to database failed.");
-    public static final Option<Integer> SINK_BUFFER_SIZE =
-            Options.key("sink.buffer-size")
-                    .intType()
-                    .defaultValue(DEFAULT_SINK_BUFFER_SIZE)
-                    .withDescription("the buffer size to cache data for stream 
load.");
-    public static final Option<Integer> SINK_BUFFER_COUNT =
-            Options.key("sink.buffer-count")
-                    .intType()
-                    .defaultValue(DEFAULT_SINK_BUFFER_COUNT)
-                    .withDescription("the buffer count to cache data for 
stream load.");
-    public static final Option<String> SINK_LABEL_PREFIX =
-            Options.key("sink.label-prefix")
-                    .stringType()
-                    .defaultValue(UUID.randomUUID().toString())
-                    .withDescription("the unique label prefix.");
-    public static final Option<Boolean> SINK_ENABLE_DELETE =
-            Options.key("sink.enable-delete")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("whether to enable the delete function");
-
-    public static final Option<Integer> SINK_FLUSH_QUEUE_SIZE =
-            Options.key("sink.flush.queue-size")
-                    .intType()
-                    .defaultValue(1)
-                    .withDescription("Queue length for async upload to object 
storage");
-
-    public static final Option<Map<String, String>> 
SELECTDB_SINK_CONFIG_PREFIX =
-            Options.key("selectdb.config")
-                    .mapType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The parameter of the Copy Into data_desc. "
-                                    + "The way to specify the parameter is to 
add the prefix `selectdb.config` to the original load parameter name ");
 
     private String loadUrl;
     private String jdbcUrl;
@@ -132,65 +45,27 @@ public class SelectDBConfig {
     private Integer flushQueueSize;
     private Properties StageLoadProps;
 
-    public static SelectDBConfig loadConfig(Config pluginConfig) {
+    public static SelectDBConfig loadConfig(ReadonlyConfig pluginConfig) {
         SelectDBConfig selectdbConfig = new SelectDBConfig();
-        selectdbConfig.setLoadUrl(pluginConfig.getString(LOAD_URL.key()));
-        selectdbConfig.setJdbcUrl(pluginConfig.getString(JDBC_URL.key()));
-        
selectdbConfig.setClusterName(pluginConfig.getString(CLUSTER_NAME.key()));
-        selectdbConfig.setUsername(pluginConfig.getString(USERNAME.key()));
-        selectdbConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
-        
selectdbConfig.setTableIdentifier(pluginConfig.getString(TABLE_IDENTIFIER.key()));
-        
selectdbConfig.setStageLoadProps(parseCopyIntoProperties(pluginConfig));
-
-        if (pluginConfig.hasPath(SINK_LABEL_PREFIX.key())) {
-            
selectdbConfig.setLabelPrefix(pluginConfig.getString(SINK_LABEL_PREFIX.key()));
-        } else {
-            selectdbConfig.setLabelPrefix(SINK_LABEL_PREFIX.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_MAX_RETRIES.key())) {
-            
selectdbConfig.setMaxRetries(pluginConfig.getInt(SINK_MAX_RETRIES.key()));
-        } else {
-            selectdbConfig.setMaxRetries(SINK_MAX_RETRIES.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_ENABLE_2PC.key())) {
-            
selectdbConfig.setEnable2PC(pluginConfig.getBoolean(SINK_ENABLE_2PC.key()));
-        } else {
-            selectdbConfig.setEnable2PC(SINK_ENABLE_2PC.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_BUFFER_SIZE.key())) {
-            
selectdbConfig.setBufferSize(pluginConfig.getInt(SINK_BUFFER_SIZE.key()));
-        } else {
-            selectdbConfig.setBufferSize(SINK_BUFFER_SIZE.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_BUFFER_COUNT.key())) {
-            
selectdbConfig.setBufferCount(pluginConfig.getInt(SINK_BUFFER_COUNT.key()));
-        } else {
-            selectdbConfig.setBufferCount(SINK_BUFFER_COUNT.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_ENABLE_DELETE.key())) {
-            
selectdbConfig.setEnableDelete(pluginConfig.getBoolean(SINK_ENABLE_DELETE.key()));
-        } else {
-            selectdbConfig.setEnableDelete(SINK_ENABLE_DELETE.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_FLUSH_QUEUE_SIZE.key())) {
-            
selectdbConfig.setFlushQueueSize(pluginConfig.getInt(SINK_FLUSH_QUEUE_SIZE.key()));
-        } else {
-            
selectdbConfig.setFlushQueueSize(SINK_FLUSH_QUEUE_SIZE.defaultValue());
+        
selectdbConfig.setLoadUrl(pluginConfig.get(SelectDBSinkOptions.LOAD_URL));
+        
selectdbConfig.setJdbcUrl(pluginConfig.get(SelectDBSinkOptions.JDBC_URL));
+        
selectdbConfig.setClusterName(pluginConfig.get(SelectDBSinkOptions.CLUSTER_NAME));
+        
selectdbConfig.setUsername(pluginConfig.get(SelectDBSinkOptions.USERNAME));
+        
selectdbConfig.setPassword(pluginConfig.get(SelectDBSinkOptions.PASSWORD));
+        
selectdbConfig.setTableIdentifier(pluginConfig.get(SelectDBSinkOptions.TABLE_IDENTIFIER));
+        if 
(pluginConfig.getOptional(SelectDBSinkOptions.SELECTDB_SINK_CONFIG_PREFIX).isPresent())
 {
+            Properties properties = new Properties();
+            
properties.putAll(pluginConfig.get(SelectDBSinkOptions.SELECTDB_SINK_CONFIG_PREFIX));
+            selectdbConfig.setStageLoadProps(properties);
         }
+        
selectdbConfig.setLabelPrefix(pluginConfig.get(SelectDBSinkOptions.SINK_LABEL_PREFIX));
+        
selectdbConfig.setMaxRetries(pluginConfig.get(SelectDBSinkOptions.SINK_MAX_RETRIES));
+        
selectdbConfig.setEnable2PC(pluginConfig.get(SelectDBSinkOptions.SINK_ENABLE_2PC));
+        
selectdbConfig.setBufferSize(pluginConfig.get(SelectDBSinkOptions.SINK_BUFFER_SIZE));
+        
selectdbConfig.setBufferCount(pluginConfig.get(SelectDBSinkOptions.SINK_BUFFER_COUNT));
+        
selectdbConfig.setEnableDelete(pluginConfig.get(SelectDBSinkOptions.SINK_ENABLE_DELETE));
+        selectdbConfig.setFlushQueueSize(
+                pluginConfig.get(SelectDBSinkOptions.SINK_FLUSH_QUEUE_SIZE));
         return selectdbConfig;
     }
-
-    private static Properties parseCopyIntoProperties(Config pluginConfig) {
-        Properties stageLoadProps = new Properties();
-        if (CheckConfigUtil.isValidParam(pluginConfig, 
SELECTDB_SINK_CONFIG_PREFIX.key())) {
-            pluginConfig
-                    .getObject(SELECTDB_SINK_CONFIG_PREFIX.key())
-                    .forEach(
-                            (key, value) -> {
-                                final String configKey = key.toLowerCase();
-                                stageLoadProps.put(configKey, 
value.unwrapped().toString());
-                            });
-        }
-        return stageLoadProps;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBSinkOptions.java
similarity index 56%
copy from 
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
copy to 
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBSinkOptions.java
index 50c3442c6b..7afb627c81 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBSinkOptions.java
@@ -17,24 +17,16 @@
 
 package org.apache.seatunnel.connectors.selectdb.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
 
 import java.util.Map;
-import java.util.Properties;
 import java.util.UUID;
 
-@Setter
-@Getter
-@ToString
-public class SelectDBConfig {
+public class SelectDBSinkOptions {
+
+    public static final String IDENTIFIER = "SelectDBCloud";
+
     private static final int DEFAULT_SINK_MAX_RETRIES = 3;
     private static final int DEFAULT_SINK_BUFFER_SIZE = 10 * 1024 * 1024;
     private static final int DEFAULT_SINK_BUFFER_COUNT = 10000;
@@ -44,11 +36,13 @@ public class SelectDBConfig {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("SelectDB load http address.");
+
     public static final Option<String> JDBC_URL =
             Options.key("jdbc-url")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("SelectDB jdbc query address.");
+
     public static final Option<String> CLUSTER_NAME =
             Options.key("cluster-name")
                     .stringType()
@@ -60,11 +54,13 @@ public class SelectDBConfig {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("the jdbc table name.");
+
     public static final Option<String> USERNAME =
             Options.key("username")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("the jdbc user name.");
+
     public static final Option<String> PASSWORD =
             Options.key("password")
                     .stringType()
@@ -82,21 +78,25 @@ public class SelectDBConfig {
                     .intType()
                     .defaultValue(DEFAULT_SINK_MAX_RETRIES)
                     .withDescription("the max retry times if writing records 
to database failed.");
+
     public static final Option<Integer> SINK_BUFFER_SIZE =
             Options.key("sink.buffer-size")
                     .intType()
                     .defaultValue(DEFAULT_SINK_BUFFER_SIZE)
                     .withDescription("the buffer size to cache data for stream 
load.");
+
     public static final Option<Integer> SINK_BUFFER_COUNT =
             Options.key("sink.buffer-count")
                     .intType()
                     .defaultValue(DEFAULT_SINK_BUFFER_COUNT)
                     .withDescription("the buffer count to cache data for 
stream load.");
+
     public static final Option<String> SINK_LABEL_PREFIX =
             Options.key("sink.label-prefix")
                     .stringType()
                     .defaultValue(UUID.randomUUID().toString())
                     .withDescription("the unique label prefix.");
+
     public static final Option<Boolean> SINK_ENABLE_DELETE =
             Options.key("sink.enable-delete")
                     .booleanType()
@@ -116,81 +116,4 @@ public class SelectDBConfig {
                     .withDescription(
                             "The parameter of the Copy Into data_desc. "
                                     + "The way to specify the parameter is to 
add the prefix `selectdb.config` to the original load parameter name ");
-
-    private String loadUrl;
-    private String jdbcUrl;
-    private String clusterName;
-    private String username;
-    private String password;
-    private String tableIdentifier;
-    private Boolean enableDelete;
-    private String labelPrefix;
-    private boolean enable2PC;
-    private Integer maxRetries;
-    private Integer bufferSize;
-    private Integer bufferCount;
-    private Integer flushQueueSize;
-    private Properties StageLoadProps;
-
-    public static SelectDBConfig loadConfig(Config pluginConfig) {
-        SelectDBConfig selectdbConfig = new SelectDBConfig();
-        selectdbConfig.setLoadUrl(pluginConfig.getString(LOAD_URL.key()));
-        selectdbConfig.setJdbcUrl(pluginConfig.getString(JDBC_URL.key()));
-        
selectdbConfig.setClusterName(pluginConfig.getString(CLUSTER_NAME.key()));
-        selectdbConfig.setUsername(pluginConfig.getString(USERNAME.key()));
-        selectdbConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
-        
selectdbConfig.setTableIdentifier(pluginConfig.getString(TABLE_IDENTIFIER.key()));
-        
selectdbConfig.setStageLoadProps(parseCopyIntoProperties(pluginConfig));
-
-        if (pluginConfig.hasPath(SINK_LABEL_PREFIX.key())) {
-            
selectdbConfig.setLabelPrefix(pluginConfig.getString(SINK_LABEL_PREFIX.key()));
-        } else {
-            selectdbConfig.setLabelPrefix(SINK_LABEL_PREFIX.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_MAX_RETRIES.key())) {
-            
selectdbConfig.setMaxRetries(pluginConfig.getInt(SINK_MAX_RETRIES.key()));
-        } else {
-            selectdbConfig.setMaxRetries(SINK_MAX_RETRIES.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_ENABLE_2PC.key())) {
-            
selectdbConfig.setEnable2PC(pluginConfig.getBoolean(SINK_ENABLE_2PC.key()));
-        } else {
-            selectdbConfig.setEnable2PC(SINK_ENABLE_2PC.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_BUFFER_SIZE.key())) {
-            
selectdbConfig.setBufferSize(pluginConfig.getInt(SINK_BUFFER_SIZE.key()));
-        } else {
-            selectdbConfig.setBufferSize(SINK_BUFFER_SIZE.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_BUFFER_COUNT.key())) {
-            
selectdbConfig.setBufferCount(pluginConfig.getInt(SINK_BUFFER_COUNT.key()));
-        } else {
-            selectdbConfig.setBufferCount(SINK_BUFFER_COUNT.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_ENABLE_DELETE.key())) {
-            
selectdbConfig.setEnableDelete(pluginConfig.getBoolean(SINK_ENABLE_DELETE.key()));
-        } else {
-            selectdbConfig.setEnableDelete(SINK_ENABLE_DELETE.defaultValue());
-        }
-        if (pluginConfig.hasPath(SINK_FLUSH_QUEUE_SIZE.key())) {
-            
selectdbConfig.setFlushQueueSize(pluginConfig.getInt(SINK_FLUSH_QUEUE_SIZE.key()));
-        } else {
-            
selectdbConfig.setFlushQueueSize(SINK_FLUSH_QUEUE_SIZE.defaultValue());
-        }
-        return selectdbConfig;
-    }
-
-    private static Properties parseCopyIntoProperties(Config pluginConfig) {
-        Properties stageLoadProps = new Properties();
-        if (CheckConfigUtil.isValidParam(pluginConfig, 
SELECTDB_SINK_CONFIG_PREFIX.key())) {
-            pluginConfig
-                    .getObject(SELECTDB_SINK_CONFIG_PREFIX.key())
-                    .forEach(
-                            (key, value) -> {
-                                final String configKey = key.toLowerCase();
-                                stageLoadProps.put(configKey, 
value.unwrapped().toString());
-                            });
-        }
-        return stageLoadProps;
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index 33222116cc..6c3e5bf76d 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -17,11 +17,8 @@
 
 package org.apache.seatunnel.connectors.selectdb.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
@@ -29,11 +26,8 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBSinkOptions;
 import 
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfo;
 import 
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfoSerializer;
 import 
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitter;
@@ -41,50 +35,27 @@ import 
org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkState;
 import 
org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkStateSerializer;
 import org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkWriter;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.CLUSTER_NAME;
-import static 
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.JDBC_URL;
-import static 
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.LOAD_URL;
-import static 
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.TABLE_IDENTIFIER;
-import static 
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.USERNAME;
-
-@AutoService(SeaTunnelSink.class)
 public class SelectDBSink
         implements SeaTunnelSink<
                 SeaTunnelRow, SelectDBSinkState, SelectDBCommitInfo, 
SelectDBCommitInfo> {
-    private Config pluginConfig;
-    private SeaTunnelRowType seaTunnelRowType;
+
+    private final SelectDBConfig dbConfig;
+    private final CatalogTable catalogTable;
     private String jobId;
 
-    @Override
-    public String getPluginName() {
-        return "SelectDBCloud";
+    public SelectDBSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
+        this.dbConfig = SelectDBConfig.loadConfig(pluginConfig);
+        this.catalogTable = catalogTable;
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        JDBC_URL.key(),
-                        LOAD_URL.key(),
-                        CLUSTER_NAME.key(),
-                        USERNAME.key(),
-                        TABLE_IDENTIFIER.key());
-        if (!result.isSuccess()) {
-            throw new SelectDBConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
+    public String getPluginName() {
+        return SelectDBSinkOptions.IDENTIFIER;
     }
 
     @Override
@@ -92,17 +63,16 @@ public class SelectDBSink
         this.jobId = jobContext.getJobId();
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> 
createWriter(
             SinkWriter.Context context) throws IOException {
         SelectDBSinkWriter selectDBSinkWriter =
                 new SelectDBSinkWriter(
-                        context, Collections.emptyList(), seaTunnelRowType, 
pluginConfig, jobId);
+                        context,
+                        Collections.emptyList(),
+                        catalogTable.getSeaTunnelRowType(),
+                        dbConfig,
+                        jobId);
         selectDBSinkWriter.initializeLoad(Collections.emptyList());
         return selectDBSinkWriter;
     }
@@ -111,7 +81,8 @@ public class SelectDBSink
     public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState> 
restoreWriter(
             SinkWriter.Context context, List<SelectDBSinkState> states) throws 
IOException {
         SelectDBSinkWriter selectDBSinkWriter =
-                new SelectDBSinkWriter(context, states, seaTunnelRowType, 
pluginConfig, jobId);
+                new SelectDBSinkWriter(
+                        context, states, catalogTable.getSeaTunnelRowType(), 
dbConfig, jobId);
         selectDBSinkWriter.initializeLoad(states);
         return selectDBSinkWriter;
     }
@@ -123,7 +94,7 @@ public class SelectDBSink
 
     @Override
     public Optional<SinkCommitter<SelectDBCommitInfo>> createCommitter() 
throws IOException {
-        return Optional.of(new SelectDBCommitter(pluginConfig));
+        return Optional.of(new SelectDBCommitter(dbConfig));
     }
 
     @Override
@@ -144,6 +115,6 @@ public class SelectDBSink
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return SeaTunnelSink.super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSinkFactory.java
new file mode 100644
index 0000000000..289a9406ec
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSinkFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.selectdb.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class SelectDBSinkFactory implements TableSinkFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return SelectDBSinkOptions.IDENTIFIER;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        SelectDBSinkOptions.JDBC_URL,
+                        SelectDBSinkOptions.LOAD_URL,
+                        SelectDBSinkOptions.CLUSTER_NAME,
+                        SelectDBSinkOptions.USERNAME,
+                        SelectDBSinkOptions.TABLE_IDENTIFIER)
+                .optional(
+                        SelectDBSinkOptions.PASSWORD,
+                        SelectDBSinkOptions.SINK_ENABLE_2PC,
+                        SelectDBSinkOptions.SINK_MAX_RETRIES,
+                        SelectDBSinkOptions.SINK_BUFFER_SIZE,
+                        SelectDBSinkOptions.SINK_BUFFER_COUNT,
+                        SelectDBSinkOptions.SINK_LABEL_PREFIX,
+                        SelectDBSinkOptions.SINK_ENABLE_DELETE,
+                        SelectDBSinkOptions.SINK_FLUSH_QUEUE_SIZE,
+                        SelectDBSinkOptions.SELECTDB_SINK_CONFIG_PREFIX)
+                .build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new SelectDBSink(context.getOptions(), 
context.getCatalogTable());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
index 9fea3e2ebf..cbb33474b8 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.selectdb.sink.committer;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
 import org.apache.seatunnel.connectors.selectdb.rest.CopySQLUtil;
@@ -31,11 +29,8 @@ import java.util.List;
 
 @Slf4j
 public class SelectDBCommitter implements SinkCommitter<SelectDBCommitInfo> {
-    private final SelectDBConfig selectdbConfig;
 
-    public SelectDBCommitter(Config pluginConfig) {
-        this(SelectDBConfig.loadConfig(pluginConfig));
-    }
+    private final SelectDBConfig selectdbConfig;
 
     public SelectDBCommitter(SelectDBConfig selectdbConfig) {
         this.selectdbConfig = selectdbConfig;
diff --git 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
index cd2d55d4fb..75a977d35d 100644
--- 
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.selectdb.sink.writer;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -54,9 +52,9 @@ public class SelectDBSinkWriter
             SinkWriter.Context context,
             List<SelectDBSinkState> state,
             SeaTunnelRowType seaTunnelRowType,
-            Config pluginConfig,
+            SelectDBConfig selectdbConfig,
             String jobId) {
-        this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
+        this.selectdbConfig = selectdbConfig;
         this.lastCheckpointId = state.size() != 0 ? 
state.get(0).getCheckpointId() : 0;
         log.info("restore checkpointId {}", lastCheckpointId);
         // filename prefix is uuid
diff --git 
a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
 
b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
index 1a900d8c8b..dbda05aad1 100644
--- 
a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
+++ 
b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
@@ -64,12 +64,7 @@ public class SeaTunnelConnectorTest extends TestSuiteBase 
implements TestResourc
      * be discovered by seatunnel-plugin-discovery todo: If these connectors 
implement the Factory
      * interface in the future, it should be removed from here
      */
-    private static final Set<String> EXCLUDE_CONNECTOR =
-            new HashSet() {
-                {
-                    add("SelectDBCloud");
-                }
-            };
+    private static final Set<String> EXCLUDE_CONNECTOR = new HashSet();
 
     /** All supported transforms. */
     private static final Set<String> TRANSFORMS =

Reply via email to