This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1b44b9b440 [improve] selectdb options (#9252)
1b44b9b440 is described below
commit 1b44b9b440c7ec52e6f85fbf0a28f4a51c32b39a
Author: Jarvis <[email protected]>
AuthorDate: Mon Jul 7 16:24:51 2025 +0800
[improve] selectdb options (#9252)
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 1 -
.../connectors/selectdb/config/SelectDBConfig.java | 165 +++------------------
...electDBConfig.java => SelectDBSinkOptions.java} | 101 ++-----------
.../connectors/selectdb/sink/SelectDBSink.java | 69 +++------
.../selectdb/sink/SelectDBSinkFactory.java | 63 ++++++++
.../selectdb/sink/committer/SelectDBCommitter.java | 7 +-
.../selectdb/sink/writer/SelectDBSinkWriter.java | 6 +-
.../starter/seatunnel/SeaTunnelConnectorTest.java | 7 +-
8 files changed, 119 insertions(+), 300 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index d433102de5..21a64a0c25 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -191,7 +191,6 @@ public class ConnectorOptionCheckTest {
Set<String> whiteList = new HashSet<>();
whiteList.add("JdbcSinkOptions");
whiteList.add("MongodbSinkOptions");
- whiteList.add("SelectDBSinkOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
whiteList.add("OracleIncrementalSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
index 50c3442c6b..8d6ba4c5b6 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
@@ -17,105 +17,18 @@
package org.apache.seatunnel.connectors.selectdb.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
@Setter
@Getter
@ToString
public class SelectDBConfig {
- private static final int DEFAULT_SINK_MAX_RETRIES = 3;
- private static final int DEFAULT_SINK_BUFFER_SIZE = 10 * 1024 * 1024;
- private static final int DEFAULT_SINK_BUFFER_COUNT = 10000;
- // common option
- public static final Option<String> LOAD_URL =
- Options.key("load-url")
- .stringType()
- .noDefaultValue()
- .withDescription("SelectDB load http address.");
- public static final Option<String> JDBC_URL =
- Options.key("jdbc-url")
- .stringType()
- .noDefaultValue()
- .withDescription("SelectDB jdbc query address.");
- public static final Option<String> CLUSTER_NAME =
- Options.key("cluster-name")
- .stringType()
- .noDefaultValue()
- .withDescription("SelectDB cluster name.");
-
- public static final Option<String> TABLE_IDENTIFIER =
- Options.key("table.identifier")
- .stringType()
- .noDefaultValue()
- .withDescription("the jdbc table name.");
- public static final Option<String> USERNAME =
- Options.key("username")
- .stringType()
- .noDefaultValue()
- .withDescription("the jdbc user name.");
- public static final Option<String> PASSWORD =
- Options.key("password")
- .stringType()
- .noDefaultValue()
- .withDescription("the jdbc password.");
-
- public static final Option<Boolean> SINK_ENABLE_2PC =
- Options.key("sink.enable-2pc")
- .booleanType()
- .defaultValue(true)
- .withDescription("enable 2PC while loading");
- // sink config options
- public static final Option<Integer> SINK_MAX_RETRIES =
- Options.key("sink.max-retries")
- .intType()
- .defaultValue(DEFAULT_SINK_MAX_RETRIES)
- .withDescription("the max retry times if writing records
to database failed.");
- public static final Option<Integer> SINK_BUFFER_SIZE =
- Options.key("sink.buffer-size")
- .intType()
- .defaultValue(DEFAULT_SINK_BUFFER_SIZE)
- .withDescription("the buffer size to cache data for stream
load.");
- public static final Option<Integer> SINK_BUFFER_COUNT =
- Options.key("sink.buffer-count")
- .intType()
- .defaultValue(DEFAULT_SINK_BUFFER_COUNT)
- .withDescription("the buffer count to cache data for
stream load.");
- public static final Option<String> SINK_LABEL_PREFIX =
- Options.key("sink.label-prefix")
- .stringType()
- .defaultValue(UUID.randomUUID().toString())
- .withDescription("the unique label prefix.");
- public static final Option<Boolean> SINK_ENABLE_DELETE =
- Options.key("sink.enable-delete")
- .booleanType()
- .defaultValue(false)
- .withDescription("whether to enable the delete function");
-
- public static final Option<Integer> SINK_FLUSH_QUEUE_SIZE =
- Options.key("sink.flush.queue-size")
- .intType()
- .defaultValue(1)
- .withDescription("Queue length for async upload to object
storage");
-
- public static final Option<Map<String, String>>
SELECTDB_SINK_CONFIG_PREFIX =
- Options.key("selectdb.config")
- .mapType()
- .noDefaultValue()
- .withDescription(
- "The parameter of the Copy Into data_desc. "
- + "The way to specify the parameter is to
add the prefix `selectdb.config` to the original load parameter name ");
private String loadUrl;
private String jdbcUrl;
@@ -132,65 +45,27 @@ public class SelectDBConfig {
private Integer flushQueueSize;
private Properties StageLoadProps;
- public static SelectDBConfig loadConfig(Config pluginConfig) {
+ public static SelectDBConfig loadConfig(ReadonlyConfig pluginConfig) {
SelectDBConfig selectdbConfig = new SelectDBConfig();
- selectdbConfig.setLoadUrl(pluginConfig.getString(LOAD_URL.key()));
- selectdbConfig.setJdbcUrl(pluginConfig.getString(JDBC_URL.key()));
-
selectdbConfig.setClusterName(pluginConfig.getString(CLUSTER_NAME.key()));
- selectdbConfig.setUsername(pluginConfig.getString(USERNAME.key()));
- selectdbConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
-
selectdbConfig.setTableIdentifier(pluginConfig.getString(TABLE_IDENTIFIER.key()));
-
selectdbConfig.setStageLoadProps(parseCopyIntoProperties(pluginConfig));
-
- if (pluginConfig.hasPath(SINK_LABEL_PREFIX.key())) {
-
selectdbConfig.setLabelPrefix(pluginConfig.getString(SINK_LABEL_PREFIX.key()));
- } else {
- selectdbConfig.setLabelPrefix(SINK_LABEL_PREFIX.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_MAX_RETRIES.key())) {
-
selectdbConfig.setMaxRetries(pluginConfig.getInt(SINK_MAX_RETRIES.key()));
- } else {
- selectdbConfig.setMaxRetries(SINK_MAX_RETRIES.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_ENABLE_2PC.key())) {
-
selectdbConfig.setEnable2PC(pluginConfig.getBoolean(SINK_ENABLE_2PC.key()));
- } else {
- selectdbConfig.setEnable2PC(SINK_ENABLE_2PC.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_BUFFER_SIZE.key())) {
-
selectdbConfig.setBufferSize(pluginConfig.getInt(SINK_BUFFER_SIZE.key()));
- } else {
- selectdbConfig.setBufferSize(SINK_BUFFER_SIZE.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_BUFFER_COUNT.key())) {
-
selectdbConfig.setBufferCount(pluginConfig.getInt(SINK_BUFFER_COUNT.key()));
- } else {
- selectdbConfig.setBufferCount(SINK_BUFFER_COUNT.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_ENABLE_DELETE.key())) {
-
selectdbConfig.setEnableDelete(pluginConfig.getBoolean(SINK_ENABLE_DELETE.key()));
- } else {
- selectdbConfig.setEnableDelete(SINK_ENABLE_DELETE.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_FLUSH_QUEUE_SIZE.key())) {
-
selectdbConfig.setFlushQueueSize(pluginConfig.getInt(SINK_FLUSH_QUEUE_SIZE.key()));
- } else {
-
selectdbConfig.setFlushQueueSize(SINK_FLUSH_QUEUE_SIZE.defaultValue());
+
selectdbConfig.setLoadUrl(pluginConfig.get(SelectDBSinkOptions.LOAD_URL));
+
selectdbConfig.setJdbcUrl(pluginConfig.get(SelectDBSinkOptions.JDBC_URL));
+
selectdbConfig.setClusterName(pluginConfig.get(SelectDBSinkOptions.CLUSTER_NAME));
+
selectdbConfig.setUsername(pluginConfig.get(SelectDBSinkOptions.USERNAME));
+
selectdbConfig.setPassword(pluginConfig.get(SelectDBSinkOptions.PASSWORD));
+
selectdbConfig.setTableIdentifier(pluginConfig.get(SelectDBSinkOptions.TABLE_IDENTIFIER));
+ if
(pluginConfig.getOptional(SelectDBSinkOptions.SELECTDB_SINK_CONFIG_PREFIX).isPresent())
{
+ Properties properties = new Properties();
+
properties.putAll(pluginConfig.get(SelectDBSinkOptions.SELECTDB_SINK_CONFIG_PREFIX));
+ selectdbConfig.setStageLoadProps(properties);
}
+
selectdbConfig.setLabelPrefix(pluginConfig.get(SelectDBSinkOptions.SINK_LABEL_PREFIX));
+
selectdbConfig.setMaxRetries(pluginConfig.get(SelectDBSinkOptions.SINK_MAX_RETRIES));
+
selectdbConfig.setEnable2PC(pluginConfig.get(SelectDBSinkOptions.SINK_ENABLE_2PC));
+
selectdbConfig.setBufferSize(pluginConfig.get(SelectDBSinkOptions.SINK_BUFFER_SIZE));
+
selectdbConfig.setBufferCount(pluginConfig.get(SelectDBSinkOptions.SINK_BUFFER_COUNT));
+
selectdbConfig.setEnableDelete(pluginConfig.get(SelectDBSinkOptions.SINK_ENABLE_DELETE));
+ selectdbConfig.setFlushQueueSize(
+ pluginConfig.get(SelectDBSinkOptions.SINK_FLUSH_QUEUE_SIZE));
return selectdbConfig;
}
-
- private static Properties parseCopyIntoProperties(Config pluginConfig) {
- Properties stageLoadProps = new Properties();
- if (CheckConfigUtil.isValidParam(pluginConfig,
SELECTDB_SINK_CONFIG_PREFIX.key())) {
- pluginConfig
- .getObject(SELECTDB_SINK_CONFIG_PREFIX.key())
- .forEach(
- (key, value) -> {
- final String configKey = key.toLowerCase();
- stageLoadProps.put(configKey,
value.unwrapped().toString());
- });
- }
- return stageLoadProps;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBSinkOptions.java
similarity index 56%
copy from
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
copy to
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBSinkOptions.java
index 50c3442c6b..7afb627c81 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBConfig.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/config/SelectDBSinkOptions.java
@@ -17,24 +17,16 @@
package org.apache.seatunnel.connectors.selectdb.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
import java.util.Map;
-import java.util.Properties;
import java.util.UUID;
-@Setter
-@Getter
-@ToString
-public class SelectDBConfig {
+public class SelectDBSinkOptions {
+
+ public static final String IDENTIFIER = "SelectDBCloud";
+
private static final int DEFAULT_SINK_MAX_RETRIES = 3;
private static final int DEFAULT_SINK_BUFFER_SIZE = 10 * 1024 * 1024;
private static final int DEFAULT_SINK_BUFFER_COUNT = 10000;
@@ -44,11 +36,13 @@ public class SelectDBConfig {
.stringType()
.noDefaultValue()
.withDescription("SelectDB load http address.");
+
public static final Option<String> JDBC_URL =
Options.key("jdbc-url")
.stringType()
.noDefaultValue()
.withDescription("SelectDB jdbc query address.");
+
public static final Option<String> CLUSTER_NAME =
Options.key("cluster-name")
.stringType()
@@ -60,11 +54,13 @@ public class SelectDBConfig {
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
+
public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription("the jdbc user name.");
+
public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
@@ -82,21 +78,25 @@ public class SelectDBConfig {
.intType()
.defaultValue(DEFAULT_SINK_MAX_RETRIES)
.withDescription("the max retry times if writing records
to database failed.");
+
public static final Option<Integer> SINK_BUFFER_SIZE =
Options.key("sink.buffer-size")
.intType()
.defaultValue(DEFAULT_SINK_BUFFER_SIZE)
.withDescription("the buffer size to cache data for stream
load.");
+
public static final Option<Integer> SINK_BUFFER_COUNT =
Options.key("sink.buffer-count")
.intType()
.defaultValue(DEFAULT_SINK_BUFFER_COUNT)
.withDescription("the buffer count to cache data for
stream load.");
+
public static final Option<String> SINK_LABEL_PREFIX =
Options.key("sink.label-prefix")
.stringType()
.defaultValue(UUID.randomUUID().toString())
.withDescription("the unique label prefix.");
+
public static final Option<Boolean> SINK_ENABLE_DELETE =
Options.key("sink.enable-delete")
.booleanType()
@@ -116,81 +116,4 @@ public class SelectDBConfig {
.withDescription(
"The parameter of the Copy Into data_desc. "
+ "The way to specify the parameter is to
add the prefix `selectdb.config` to the original load parameter name ");
-
- private String loadUrl;
- private String jdbcUrl;
- private String clusterName;
- private String username;
- private String password;
- private String tableIdentifier;
- private Boolean enableDelete;
- private String labelPrefix;
- private boolean enable2PC;
- private Integer maxRetries;
- private Integer bufferSize;
- private Integer bufferCount;
- private Integer flushQueueSize;
- private Properties StageLoadProps;
-
- public static SelectDBConfig loadConfig(Config pluginConfig) {
- SelectDBConfig selectdbConfig = new SelectDBConfig();
- selectdbConfig.setLoadUrl(pluginConfig.getString(LOAD_URL.key()));
- selectdbConfig.setJdbcUrl(pluginConfig.getString(JDBC_URL.key()));
-
selectdbConfig.setClusterName(pluginConfig.getString(CLUSTER_NAME.key()));
- selectdbConfig.setUsername(pluginConfig.getString(USERNAME.key()));
- selectdbConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
-
selectdbConfig.setTableIdentifier(pluginConfig.getString(TABLE_IDENTIFIER.key()));
-
selectdbConfig.setStageLoadProps(parseCopyIntoProperties(pluginConfig));
-
- if (pluginConfig.hasPath(SINK_LABEL_PREFIX.key())) {
-
selectdbConfig.setLabelPrefix(pluginConfig.getString(SINK_LABEL_PREFIX.key()));
- } else {
- selectdbConfig.setLabelPrefix(SINK_LABEL_PREFIX.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_MAX_RETRIES.key())) {
-
selectdbConfig.setMaxRetries(pluginConfig.getInt(SINK_MAX_RETRIES.key()));
- } else {
- selectdbConfig.setMaxRetries(SINK_MAX_RETRIES.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_ENABLE_2PC.key())) {
-
selectdbConfig.setEnable2PC(pluginConfig.getBoolean(SINK_ENABLE_2PC.key()));
- } else {
- selectdbConfig.setEnable2PC(SINK_ENABLE_2PC.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_BUFFER_SIZE.key())) {
-
selectdbConfig.setBufferSize(pluginConfig.getInt(SINK_BUFFER_SIZE.key()));
- } else {
- selectdbConfig.setBufferSize(SINK_BUFFER_SIZE.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_BUFFER_COUNT.key())) {
-
selectdbConfig.setBufferCount(pluginConfig.getInt(SINK_BUFFER_COUNT.key()));
- } else {
- selectdbConfig.setBufferCount(SINK_BUFFER_COUNT.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_ENABLE_DELETE.key())) {
-
selectdbConfig.setEnableDelete(pluginConfig.getBoolean(SINK_ENABLE_DELETE.key()));
- } else {
- selectdbConfig.setEnableDelete(SINK_ENABLE_DELETE.defaultValue());
- }
- if (pluginConfig.hasPath(SINK_FLUSH_QUEUE_SIZE.key())) {
-
selectdbConfig.setFlushQueueSize(pluginConfig.getInt(SINK_FLUSH_QUEUE_SIZE.key()));
- } else {
-
selectdbConfig.setFlushQueueSize(SINK_FLUSH_QUEUE_SIZE.defaultValue());
- }
- return selectdbConfig;
- }
-
- private static Properties parseCopyIntoProperties(Config pluginConfig) {
- Properties stageLoadProps = new Properties();
- if (CheckConfigUtil.isValidParam(pluginConfig,
SELECTDB_SINK_CONFIG_PREFIX.key())) {
- pluginConfig
- .getObject(SELECTDB_SINK_CONFIG_PREFIX.key())
- .forEach(
- (key, value) -> {
- final String configKey = key.toLowerCase();
- stageLoadProps.put(configKey,
value.unwrapped().toString());
- });
- }
- return stageLoadProps;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
index 33222116cc..6c3e5bf76d 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSink.java
@@ -17,11 +17,8 @@
package org.apache.seatunnel.connectors.selectdb.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
@@ -29,11 +26,8 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBSinkOptions;
import
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfo;
import
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitInfoSerializer;
import
org.apache.seatunnel.connectors.selectdb.sink.committer.SelectDBCommitter;
@@ -41,50 +35,27 @@ import
org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkState;
import
org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkStateSerializer;
import org.apache.seatunnel.connectors.selectdb.sink.writer.SelectDBSinkWriter;
-import com.google.auto.service.AutoService;
-
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.CLUSTER_NAME;
-import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.JDBC_URL;
-import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.LOAD_URL;
-import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.TABLE_IDENTIFIER;
-import static
org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig.USERNAME;
-
-@AutoService(SeaTunnelSink.class)
public class SelectDBSink
implements SeaTunnelSink<
SeaTunnelRow, SelectDBSinkState, SelectDBCommitInfo,
SelectDBCommitInfo> {
- private Config pluginConfig;
- private SeaTunnelRowType seaTunnelRowType;
+
+ private final SelectDBConfig dbConfig;
+ private final CatalogTable catalogTable;
private String jobId;
- @Override
- public String getPluginName() {
- return "SelectDBCloud";
+ public SelectDBSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
+ this.dbConfig = SelectDBConfig.loadConfig(pluginConfig);
+ this.catalogTable = catalogTable;
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- JDBC_URL.key(),
- LOAD_URL.key(),
- CLUSTER_NAME.key(),
- USERNAME.key(),
- TABLE_IDENTIFIER.key());
- if (!result.isSuccess()) {
- throw new SelectDBConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
+ public String getPluginName() {
+ return SelectDBSinkOptions.IDENTIFIER;
}
@Override
@@ -92,17 +63,16 @@ public class SelectDBSink
this.jobId = jobContext.getJobId();
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- }
-
@Override
public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState>
createWriter(
SinkWriter.Context context) throws IOException {
SelectDBSinkWriter selectDBSinkWriter =
new SelectDBSinkWriter(
- context, Collections.emptyList(), seaTunnelRowType,
pluginConfig, jobId);
+ context,
+ Collections.emptyList(),
+ catalogTable.getSeaTunnelRowType(),
+ dbConfig,
+ jobId);
selectDBSinkWriter.initializeLoad(Collections.emptyList());
return selectDBSinkWriter;
}
@@ -111,7 +81,8 @@ public class SelectDBSink
public SinkWriter<SeaTunnelRow, SelectDBCommitInfo, SelectDBSinkState>
restoreWriter(
SinkWriter.Context context, List<SelectDBSinkState> states) throws
IOException {
SelectDBSinkWriter selectDBSinkWriter =
- new SelectDBSinkWriter(context, states, seaTunnelRowType,
pluginConfig, jobId);
+ new SelectDBSinkWriter(
+ context, states, catalogTable.getSeaTunnelRowType(),
dbConfig, jobId);
selectDBSinkWriter.initializeLoad(states);
return selectDBSinkWriter;
}
@@ -123,7 +94,7 @@ public class SelectDBSink
@Override
public Optional<SinkCommitter<SelectDBCommitInfo>> createCommitter()
throws IOException {
- return Optional.of(new SelectDBCommitter(pluginConfig));
+ return Optional.of(new SelectDBCommitter(dbConfig));
}
@Override
@@ -144,6 +115,6 @@ public class SelectDBSink
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return SeaTunnelSink.super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSinkFactory.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSinkFactory.java
new file mode 100644
index 0000000000..289a9406ec
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/SelectDBSinkFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.selectdb.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.selectdb.config.SelectDBSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class SelectDBSinkFactory implements TableSinkFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return SelectDBSinkOptions.IDENTIFIER;
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(
+ SelectDBSinkOptions.JDBC_URL,
+ SelectDBSinkOptions.LOAD_URL,
+ SelectDBSinkOptions.CLUSTER_NAME,
+ SelectDBSinkOptions.USERNAME,
+ SelectDBSinkOptions.TABLE_IDENTIFIER)
+ .optional(
+ SelectDBSinkOptions.PASSWORD,
+ SelectDBSinkOptions.SINK_ENABLE_2PC,
+ SelectDBSinkOptions.SINK_MAX_RETRIES,
+ SelectDBSinkOptions.SINK_BUFFER_SIZE,
+ SelectDBSinkOptions.SINK_BUFFER_COUNT,
+ SelectDBSinkOptions.SINK_LABEL_PREFIX,
+ SelectDBSinkOptions.SINK_ENABLE_DELETE,
+ SelectDBSinkOptions.SINK_FLUSH_QUEUE_SIZE,
+ SelectDBSinkOptions.SELECTDB_SINK_CONFIG_PREFIX)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new SelectDBSink(context.getOptions(),
context.getCatalogTable());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
index 9fea3e2ebf..cbb33474b8 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/committer/SelectDBCommitter.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.selectdb.sink.committer;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.selectdb.config.SelectDBConfig;
import org.apache.seatunnel.connectors.selectdb.rest.CopySQLUtil;
@@ -31,11 +29,8 @@ import java.util.List;
@Slf4j
public class SelectDBCommitter implements SinkCommitter<SelectDBCommitInfo> {
- private final SelectDBConfig selectdbConfig;
- public SelectDBCommitter(Config pluginConfig) {
- this(SelectDBConfig.loadConfig(pluginConfig));
- }
+ private final SelectDBConfig selectdbConfig;
public SelectDBCommitter(SelectDBConfig selectdbConfig) {
this.selectdbConfig = selectdbConfig;
diff --git
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
index cd2d55d4fb..75a977d35d 100644
---
a/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connectors/selectdb/sink/writer/SelectDBSinkWriter.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.selectdb.sink.writer;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -54,9 +52,9 @@ public class SelectDBSinkWriter
SinkWriter.Context context,
List<SelectDBSinkState> state,
SeaTunnelRowType seaTunnelRowType,
- Config pluginConfig,
+ SelectDBConfig selectdbConfig,
String jobId) {
- this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
+ this.selectdbConfig = selectdbConfig;
this.lastCheckpointId = state.size() != 0 ?
state.get(0).getCheckpointId() : 0;
log.info("restore checkpointId {}", lastCheckpointId);
// filename prefix is uuid
diff --git
a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
index 1a900d8c8b..dbda05aad1 100644
---
a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
+++
b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java
@@ -64,12 +64,7 @@ public class SeaTunnelConnectorTest extends TestSuiteBase
implements TestResourc
* be discovered by seatunnel-plugin-discovery todo: If these connectors
implement the Factory
* interface in the future, it should be removed from here
*/
- private static final Set<String> EXCLUDE_CONNECTOR =
- new HashSet() {
- {
- add("SelectDBCloud");
- }
- };
+ private static final Set<String> EXCLUDE_CONNECTOR = new HashSet();
/** All supported transforms. */
private static final Set<String> TRANSFORMS =