This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new efeed28aea [Improve][Connectors-v2] File sink refactor (#10587)
efeed28aea is described below
commit efeed28aeacf0e7568d0accbd2ec52db48eecb10
Author: Jarvis <[email protected]>
AuthorDate: Wed Mar 25 16:59:46 2026 +0800
[Improve][Connectors-v2] File sink refactor (#10587)
---
.../seatunnel/file/hdfs/sink/BaseHdfsFileSink.java | 76 -----------
.../seatunnel/file/config/BaseFileSinkConfig.java | 141 +++++----------------
.../seatunnel/file/sink/BaseFileSink.java | 58 ++++-----
.../file/sink/BaseMultipleTableFileSink.java | 2 +-
.../seatunnel/file/sink/config/FileSinkConfig.java | 126 ++++++------------
.../file/writer/CsvWriteStrategyTest.java | 5 +-
.../seatunnel/file/writer/FileSinkConfigTest.java | 10 +-
.../file/writer/OrcWriteStrategyTest.java | 5 +-
.../file/writer/ParquetWriteStrategyTest.java | 5 +-
.../seatunnel/file/cos/config/CosConf.java | 16 +++
.../seatunnel/file/cos/sink/CosFileSink.java | 48 ++-----
.../file/cos/sink/CosFileSinkFactory.java | 7 +
.../seatunnel/file/oss/jindo/config/OssConf.java | 16 +++
.../seatunnel/file/oss/jindo/sink/OssFileSink.java | 48 ++-----
.../file/oss/jindo/sink/OssFileSinkFactory.java | 7 +
.../seatunnel/file/obs/config/ObsConf.java | 11 ++
.../seatunnel/file/obs/sink/ObsFileSink.java | 48 ++-----
.../file/obs/sink/ObsFileSinkFactory.java | 7 +
.../connectors/seatunnel/hive/sink/HiveSink.java | 6 +-
.../seatunnel/redshift/RedshiftJdbcClient.java | 14 +-
.../commit/S3RedshiftSinkAggregatedCommitter.java | 10 +-
...nfigOptions.java => S3RedshiftSinkOptions.java} | 2 +-
.../seatunnel/redshift/sink/S3RedshiftSink.java | 49 ++-----
...hiftFactory.java => S3RedshiftSinkFactory.java} | 19 ++-
.../e2e/sink/inmemory/InMemorySinkFactory.java | 4 +-
25 files changed, 251 insertions(+), 489 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
deleted file mode 100644
index 9304d9c360..0000000000
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
-
-import java.util.Objects;
-
-import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
-
-public abstract class BaseHdfsFileSink extends BaseFileSink {
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
FS_DEFAULT_NAME_KEY);
- if (!result.isSuccess()) {
- throw new FileConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- super.prepare(pluginConfig);
- // Avoid overwriting hadoopConf for subclass initialization. If a
subclass is initialized,
- // it is not initialized here.
- if (Objects.isNull(hadoopConf)) {
- hadoopConf = new
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
- }
- if (pluginConfig.hasPath(FileBaseSinkOptions.HDFS_SITE_PATH.key())) {
- hadoopConf.setHdfsSitePath(
-
pluginConfig.getString(FileBaseSinkOptions.HDFS_SITE_PATH.key()));
- }
-
- if (pluginConfig.hasPath(FileBaseSinkOptions.REMOTE_USER.key())) {
-
hadoopConf.setRemoteUser(pluginConfig.getString(FileBaseSinkOptions.REMOTE_USER.key()));
- }
-
- if (pluginConfig.hasPath(FileBaseSinkOptions.KRB5_PATH.key())) {
-
hadoopConf.setKrb5Path(pluginConfig.getString(FileBaseSinkOptions.KRB5_PATH.key()));
- }
-
- if
(pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key())) {
- hadoopConf.setKerberosPrincipal(
-
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key()));
- }
- if
(pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key())) {
- hadoopConf.setKerberosKeytabPath(
-
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key()));
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index b32711d413..b5f12771db 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -17,9 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
@@ -29,126 +27,57 @@ import lombok.NonNull;
import java.io.File;
import java.io.Serializable;
-import java.util.Locale;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@Data
public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
private static final long serialVersionUID = 1L;
- protected CompressFormat compressFormat =
FileBaseSinkOptions.COMPRESS_CODEC.defaultValue();
+ protected CompressFormat compressFormat;
protected String fieldDelimiter;
- protected int sheetMaxRows =
FileBaseSinkOptions.SHEET_MAX_ROWS.defaultValue();
- protected String rowDelimiter =
FileBaseSinkOptions.ROW_DELIMITER.defaultValue();
- protected int batchSize = FileBaseSinkOptions.BATCH_SIZE.defaultValue();
+ protected int sheetMaxRows;
+ protected String rowDelimiter;
+ protected int batchSize;
protected String path;
- protected String fileNameExpression =
FileBaseSinkOptions.FILE_NAME_EXPRESSION.defaultValue();
- protected boolean singleFileMode =
FileBaseSinkOptions.SINGLE_FILE_MODE.defaultValue();
- protected boolean createEmptyFileWhenNoData =
- FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
+ protected String fileNameExpression;
+ protected boolean singleFileMode;
+ protected boolean createEmptyFileWhenNoData;
protected FileFormat fileFormat;
- protected String filenameExtension =
FileBaseSinkOptions.FILENAME_EXTENSION.defaultValue();
- protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
- protected DateTimeUtils.Formatter datetimeFormat =
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
- protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
+ protected String filenameExtension;
+ protected DateUtils.Formatter dateFormat;
+ protected DateTimeUtils.Formatter datetimeFormat;
+ protected TimeUtils.Formatter timeFormat;
protected Boolean enableHeaderWriter = false;
- public BaseFileSinkConfig(@NonNull Config config) {
- if (config.hasPath(FileBaseSinkOptions.COMPRESS_CODEC.key())) {
- String compressCodec =
config.getString(FileBaseSinkOptions.COMPRESS_CODEC.key());
- this.compressFormat =
CompressFormat.valueOf(compressCodec.toUpperCase());
- }
- if (config.hasPath(FileBaseSinkOptions.BATCH_SIZE.key())) {
- this.batchSize =
config.getInt(FileBaseSinkOptions.BATCH_SIZE.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.SHEET_MAX_ROWS.key())
- && StringUtils.isNotEmpty(
-
config.getString(FileBaseSinkOptions.SHEET_MAX_ROWS.key()))) {
- this.sheetMaxRows =
config.getInt(FileBaseSinkOptions.SHEET_MAX_ROWS.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.ROW_DELIMITER.key())) {
- this.rowDelimiter =
config.getString(FileBaseSinkOptions.ROW_DELIMITER.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.FILE_PATH.key())
- &&
!StringUtils.isBlank(config.getString(FileBaseSinkOptions.FILE_PATH.key()))) {
- this.path = config.getString(FileBaseSinkOptions.FILE_PATH.key());
- }
+ public BaseFileSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
+ this.compressFormat =
pluginConfig.get(FileBaseSinkOptions.COMPRESS_CODEC);
+ this.batchSize = pluginConfig.get(FileBaseSinkOptions.BATCH_SIZE);
+ this.sheetMaxRows =
pluginConfig.get(FileBaseSinkOptions.SHEET_MAX_ROWS);
+ this.rowDelimiter =
pluginConfig.get(FileBaseSinkOptions.ROW_DELIMITER);
+ this.path = pluginConfig.get(FileBaseSinkOptions.FILE_PATH);
checkNotNull(path);
-
if (path.equals(File.separator)) {
this.path = "";
}
-
- if (config.hasPath(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key())
- && !StringUtils.isBlank(
-
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key()))) {
- this.fileNameExpression =
-
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.SINGLE_FILE_MODE.key())) {
- this.singleFileMode =
config.getBoolean(FileBaseSinkOptions.SINGLE_FILE_MODE.key());
- }
-
- if
(config.hasPath(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
- this.createEmptyFileWhenNoData =
-
config.getBoolean(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
- && !StringUtils.isBlank(
-
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key()))) {
- this.fileFormat =
- FileFormat.valueOf(
-
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
- .toUpperCase(Locale.ROOT));
- } else {
- // fall back to the default
- this.fileFormat =
FileBaseSinkOptions.FILE_FORMAT_TYPE.defaultValue();
- }
-
- if (config.hasPath(FileBaseSinkOptions.FIELD_DELIMITER.key())
- && StringUtils.isNotEmpty(
-
config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key()))) {
- this.fieldDelimiter =
config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key());
+ this.fileNameExpression =
pluginConfig.get(FileBaseSinkOptions.FILE_NAME_EXPRESSION);
+ this.singleFileMode =
pluginConfig.get(FileBaseSinkOptions.SINGLE_FILE_MODE);
+ this.createEmptyFileWhenNoData =
+
pluginConfig.get(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA);
+ this.fileFormat =
pluginConfig.get(FileBaseSinkOptions.FILE_FORMAT_TYPE);
+ // if set, use user config, if not set, when format is csv, use ","
otherwise use default
+ // delimiter
+ if
(pluginConfig.getOptional(FileBaseSinkOptions.FIELD_DELIMITER).isPresent()) {
+ this.fieldDelimiter =
pluginConfig.get(FileBaseSinkOptions.FIELD_DELIMITER);
+ } else if (FileFormat.CSV.equals(this.fileFormat)) {
+ this.fieldDelimiter = ",";
} else {
- if (FileFormat.CSV.equals(this.fileFormat)) {
- this.fieldDelimiter = ",";
- } else {
- this.fieldDelimiter =
FileBaseSinkOptions.FIELD_DELIMITER.defaultValue();
- }
- }
-
- if (config.hasPath(FileBaseSinkOptions.FILENAME_EXTENSION.key())
- && !StringUtils.isBlank(
-
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key()))) {
- this.filenameExtension =
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.DATE_FORMAT_LEGACY.key())) {
- dateFormat =
- DateUtils.Formatter.parse(
-
config.getString(FileBaseSinkOptions.DATE_FORMAT_LEGACY.key()));
- }
-
- if (config.hasPath(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY.key())) {
- datetimeFormat =
- DateTimeUtils.Formatter.parse(
-
config.getString(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY.key()));
- }
-
- if (config.hasPath(FileBaseSinkOptions.TIME_FORMAT_LEGACY.key())) {
- timeFormat =
- TimeUtils.Formatter.parse(
-
config.getString(FileBaseSinkOptions.TIME_FORMAT_LEGACY.key()));
- }
-
- if (config.hasPath(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key())) {
- enableHeaderWriter =
config.getBoolean(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key());
+ this.fieldDelimiter =
FileBaseSinkOptions.FIELD_DELIMITER.defaultValue();
}
+ this.filenameExtension =
pluginConfig.get(FileBaseSinkOptions.FILENAME_EXTENSION);
+ this.dateFormat =
pluginConfig.get(FileBaseSinkOptions.DATE_FORMAT_LEGACY);
+ this.datetimeFormat =
pluginConfig.get(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY);
+ this.timeFormat =
pluginConfig.get(FileBaseSinkOptions.TIME_FORMAT_LEGACY);
+ this.enableHeaderWriter =
pluginConfig.get(FileBaseSinkOptions.ENABLE_HEADER_WRITE);
}
public BaseFileSinkConfig() {}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 79c5e2424f..abe8e41ace 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -17,19 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
@@ -46,22 +42,36 @@ import java.util.Optional;
public abstract class BaseFileSink
implements SeaTunnelSink<
SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo> {
- protected SeaTunnelRowType seaTunnelRowType;
- protected Config pluginConfig;
- protected HadoopConf hadoopConf;
+ protected ReadonlyConfig pluginConfig;
+ protected CatalogTable catalogTable;
protected FileSinkConfig fileSinkConfig;
+ protected HadoopConf hadoopConf;
protected JobContext jobContext;
protected String jobId;
+ public BaseFileSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
+ this.pluginConfig = pluginConfig;
+ this.catalogTable = catalogTable;
+ this.fileSinkConfig = new FileSinkConfig(pluginConfig,
catalogTable.getSeaTunnelRowType());
+ this.hadoopConf = initHadoopConf();
+ }
+
+ protected abstract HadoopConf initHadoopConf();
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
+ }
+
public void preCheckConfig() {
- if (pluginConfig.hasPath(FileBaseSinkOptions.SINGLE_FILE_MODE.key())
- &&
pluginConfig.getBoolean(FileBaseSinkOptions.SINGLE_FILE_MODE.key())
+ if
(pluginConfig.getOptional(FileBaseSinkOptions.SINGLE_FILE_MODE).isPresent()
+ && pluginConfig.get(FileBaseSinkOptions.SINGLE_FILE_MODE)
&& jobContext.isEnableCheckpoint()) {
throw new IllegalArgumentException(
"Single file mode is not supported when checkpoint is
enabled or in streaming mode.");
}
- if
(pluginConfig.hasPath(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())
- &&
pluginConfig.getBoolean(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())
+ if
(pluginConfig.getOptional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA).isPresent()
+ &&
pluginConfig.get(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
&& !fileSinkConfig.getPartitionFieldList().isEmpty()) {
throw new IllegalArgumentException(
"Generate empty file when no data is not supported when
partition is enabled.");
@@ -75,12 +85,6 @@ public abstract class BaseFileSink
preCheckConfig();
}
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
- this.fileSinkConfig = new FileSinkConfig(pluginConfig,
seaTunnelRowType);
- }
-
@Override
public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState>
restoreWriter(
SinkWriter.Context context, List<FileSinkState> states) {
@@ -114,24 +118,10 @@ public abstract class BaseFileSink
return Optional.of(new DefaultSerializer<>());
}
- /**
- * Use the pluginConfig to do some initialize operation.
- *
- * @param pluginConfig plugin config.
- * @throws PrepareFailException if plugin prepare failed, the {@link
PrepareFailException} will
- * throw.
- */
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- this.pluginConfig = pluginConfig;
- }
-
protected WriteStrategy createWriteStrategy() {
WriteStrategy writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(),
fileSinkConfig);
- writeStrategy.setCatalogTable(
- CatalogTableUtil.getCatalogTable(
- "file", null, null, TablePath.DEFAULT.getTableName(),
seaTunnelRowType));
+ writeStrategy.setCatalogTable(catalogTable);
return writeStrategy;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index 8ad7f4ab2d..cd2f9b1f10 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -69,7 +69,7 @@ public abstract class BaseMultipleTableFileSink
this.readonlyConfig = readonlyConfig;
this.hadoopConf = hadoopConf;
this.fileSinkConfig =
- new FileSinkConfig(readonlyConfig.toConfig(),
catalogTable.getSeaTunnelRowType());
+ new FileSinkConfig(readonlyConfig,
catalogTable.getSeaTunnelRowType());
this.catalogTable = catalogTable;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
index 8efa59e6ba..3a884c6e4b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSinkConfig;
@@ -54,16 +54,15 @@ public class FileSinkConfig extends BaseFileSinkConfig
implements PartitionConfi
private String partitionDirExpression;
- private boolean isPartitionFieldWriteInFile =
-
FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.defaultValue();
+ private boolean isPartitionFieldWriteInFile;
- private String tmpPath = FileBaseSinkOptions.TMP_PATH.defaultValue();
+ private String tmpPath;
- private String fileNameTimeFormat =
FileBaseSinkOptions.FILENAME_TIME_FORMAT.defaultValue();
+ private String fileNameTimeFormat;
- private boolean isEnableTransaction =
FileBaseSinkOptions.IS_ENABLE_TRANSACTION.defaultValue();
+ private boolean isEnableTransaction;
- private String encoding = FileBaseSinkOptions.ENCODING.defaultValue();
+ private String encoding;
// ---------------------generator by config params-------------------
@@ -75,32 +74,28 @@ public class FileSinkConfig extends BaseFileSinkConfig
implements PartitionConfi
private String sheetName;
- private String xmlRootTag =
FileBaseSinkOptions.XML_ROOT_TAG.defaultValue();
+ private String xmlRootTag;
- private String xmlRowTag = FileBaseSinkOptions.XML_ROW_TAG.defaultValue();
+ private String xmlRowTag;
private Boolean xmlUseAttrFormat;
- private Boolean parquetWriteTimestampAsInt96 =
-
FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.defaultValue();
+ private Boolean parquetWriteTimestampAsInt96;
- private List<String> parquetAvroWriteFixedAsInt96 =
-
FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.defaultValue();
+ private List<String> parquetAvroWriteFixedAsInt96;
- private CsvStringQuoteMode csvStringQuoteMode =
- FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.defaultValue();
+ private CsvStringQuoteMode csvStringQuoteMode;
- private Boolean mergeUpdateEvent =
FileBaseSinkOptions.MERGE_UPDATE_EVENT.defaultValue();
+ private Boolean mergeUpdateEvent;
- public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType
seaTunnelRowTypeInfo) {
- super(config);
+ public FileSinkConfig(
+ @NonNull ReadonlyConfig pluginConfig, @NonNull SeaTunnelRowType
seaTunnelRowTypeInfo) {
+ super(pluginConfig);
checkArgument(
!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
- if (config.hasPath(FileBaseSinkOptions.SINK_COLUMNS.key())
- && !CollectionUtils.isEmpty(
-
config.getStringList(FileBaseSinkOptions.SINK_COLUMNS.key()))) {
- this.sinkColumnList =
config.getStringList(FileBaseSinkOptions.SINK_COLUMNS.key());
+ if
(pluginConfig.getOptional(FileBaseSinkOptions.SINK_COLUMNS).isPresent()) {
+ this.sinkColumnList =
pluginConfig.get(FileBaseSinkOptions.SINK_COLUMNS);
}
// if the config sink_columns is empty, all fields in
SeaTunnelRowTypeInfo will being write
@@ -111,44 +106,27 @@ public class FileSinkConfig extends BaseFileSinkConfig
implements PartitionConfi
new
ArrayList<>(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames()));
}
- if (config.hasPath(FileBaseSinkOptions.PARTITION_BY.key())) {
- this.partitionFieldList =
config.getStringList(FileBaseSinkOptions.PARTITION_BY.key());
+ if
(pluginConfig.getOptional(FileBaseSinkOptions.PARTITION_BY).isPresent()) {
+ this.partitionFieldList =
pluginConfig.get(FileBaseSinkOptions.PARTITION_BY);
} else {
this.partitionFieldList = Collections.emptyList();
}
- if (config.hasPath(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key())
+ if
(pluginConfig.getOptional(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION).isPresent()
&& !StringUtils.isBlank(
-
config.getString(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key()))) {
+
pluginConfig.get(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION))) {
this.partitionDirExpression =
-
config.getString(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key());
+
pluginConfig.get(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION);
}
- if
(config.hasPath(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.key())) {
- this.isPartitionFieldWriteInFile =
-
config.getBoolean(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.TMP_PATH.key())
- &&
!StringUtils.isBlank(config.getString(FileBaseSinkOptions.TMP_PATH.key()))) {
- this.tmpPath =
config.getString(FileBaseSinkOptions.TMP_PATH.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key())
- && !StringUtils.isBlank(
-
config.getString(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key()))) {
- this.fileNameTimeFormat =
-
config.getString(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key());
- }
+ this.isPartitionFieldWriteInFile =
+
pluginConfig.get(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE);
- if (config.hasPath(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key())) {
- this.isEnableTransaction =
-
config.getBoolean(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key());
- }
+ this.tmpPath = pluginConfig.get(FileBaseSinkOptions.TMP_PATH);
- if (config.hasPath(FileBaseSinkOptions.ENCODING.key())) {
- this.encoding =
config.getString(FileBaseSinkOptions.ENCODING.key());
- }
+ this.fileNameTimeFormat =
pluginConfig.get(FileBaseSinkOptions.FILENAME_TIME_FORMAT);
+ this.isEnableTransaction =
pluginConfig.get(FileBaseSinkOptions.IS_ENABLE_TRANSACTION);
+ this.encoding = pluginConfig.get(FileBaseSinkOptions.ENCODING);
if (this.isEnableTransaction
&&
!this.fileNameExpression.contains(FileBaseSinkOptions.TRANSACTION_EXPRESSION)) {
@@ -204,60 +182,40 @@ public class FileSinkConfig extends BaseFileSinkConfig
implements PartitionConfi
.collect(Collectors.toList());
}
- if (config.hasPath(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY.key())) {
- this.maxRowsInMemory =
config.getInt(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY.key());
+ if
(pluginConfig.getOptional(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY).isPresent()) {
+ this.maxRowsInMemory =
pluginConfig.get(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY);
}
- if (config.hasPath(FileBaseSinkOptions.SHEET_NAME.key())) {
- this.sheetName =
config.getString(FileBaseSinkOptions.SHEET_NAME.key());
+ if
(pluginConfig.getOptional(FileBaseSinkOptions.SHEET_NAME).isPresent()) {
+ this.sheetName = pluginConfig.get(FileBaseSinkOptions.SHEET_NAME);
}
if (FileFormat.XML.equals(this.fileFormat)) {
- if
(!config.hasPath(FileBaseSinkOptions.XML_USE_ATTR_FORMAT.key())) {
+ if
(!pluginConfig.getOptional(FileBaseSinkOptions.XML_USE_ATTR_FORMAT).isPresent())
{
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"User must define xml_use_attr_format when
file_format_type is xml");
}
- this.xmlUseAttrFormat =
-
config.getBoolean(FileBaseSinkOptions.XML_USE_ATTR_FORMAT.key());
-
- if (config.hasPath(FileBaseSinkOptions.XML_ROOT_TAG.key())) {
- this.xmlRootTag =
config.getString(FileBaseSinkOptions.XML_ROOT_TAG.key());
- }
-
- if (config.hasPath(FileBaseSinkOptions.XML_ROW_TAG.key())) {
- this.xmlRowTag =
config.getString(FileBaseSinkOptions.XML_ROW_TAG.key());
- }
+ this.xmlUseAttrFormat =
pluginConfig.get(FileBaseSinkOptions.XML_USE_ATTR_FORMAT);
+ this.xmlRootTag =
pluginConfig.get(FileBaseSinkOptions.XML_ROOT_TAG);
+ this.xmlRowTag = pluginConfig.get(FileBaseSinkOptions.XML_ROW_TAG);
}
if (FileFormat.PARQUET.equals(this.fileFormat)) {
- if
(config.hasPath(FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key()))
{
- this.parquetWriteTimestampAsInt96 =
- config.getBoolean(
-
FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key());
- }
- if
(config.hasPath(FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key())) {
- this.parquetAvroWriteFixedAsInt96 =
- config.getStringList(
-
FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key());
- }
+ this.parquetWriteTimestampAsInt96 =
+
pluginConfig.get(FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96);
+ this.parquetAvroWriteFixedAsInt96 =
+
pluginConfig.get(FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96);
}
if (FileFormat.CSV.equals(this.fileFormat)) {
- if
(config.hasPath(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.key())) {
- this.csvStringQuoteMode =
- CsvStringQuoteMode.valueOf(
-
config.getString(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.key()));
- }
+ this.csvStringQuoteMode =
pluginConfig.get(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE);
}
if (FileFormat.DEBEZIUM_JSON.equals(this.fileFormat)
|| FileFormat.CANAL_JSON.equals(this.fileFormat)
|| FileFormat.MAXWELL_JSON.equals(this.fileFormat)) {
- if (config.hasPath(FileBaseSinkOptions.MERGE_UPDATE_EVENT.key())) {
- this.mergeUpdateEvent =
-
config.getBoolean(FileBaseSinkOptions.MERGE_UPDATE_EVENT.key());
- }
+ this.mergeUpdateEvent =
pluginConfig.get(FileBaseSinkOptions.MERGE_UPDATE_EVENT);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
index 7202da3e01..55e33e194c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.writer;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -63,7 +64,7 @@ public class CsvWriteStrategyTest {
BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.INT_TYPE
});
FileSinkConfig writeSinkConfig =
- new FileSinkConfig(ConfigFactory.parseMap(writeConfig),
writeRowType);
+ new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig),
writeRowType);
CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
ParquetReadStrategyTest.LocalConf hadoopConf =
new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
@@ -126,7 +127,7 @@ public class CsvWriteStrategyTest {
BasicType.INT_TYPE, BasicType.STRING_TYPE,
BasicType.INT_TYPE
});
FileSinkConfig writeSinkConfig =
- new FileSinkConfig(ConfigFactory.parseMap(writeConfig),
writeRowType);
+ new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig),
writeRowType);
CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
ParquetReadStrategyTest.LocalConf hadoopConf =
new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
index 16cf021c2c..db048b01b8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.writer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -46,7 +47,8 @@ public class FileSinkConfigTest {
new SeaTunnelRowType(
new String[] {"data", "ts"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE,
BasicType.STRING_TYPE});
- Assertions.assertDoesNotThrow(() -> new FileSinkConfig(config,
rowType));
+ Assertions.assertDoesNotThrow(
+ () -> new FileSinkConfig(ReadonlyConfig.fromConfig(config),
rowType));
}
@Test
@@ -60,7 +62,8 @@ public class FileSinkConfigTest {
new SeaTunnelRowType(
new String[] {"data", "ts"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE,
BasicType.STRING_TYPE});
- Assertions.assertDoesNotThrow(() -> new FileSinkConfig(config,
rowType));
+ Assertions.assertDoesNotThrow(
+ () -> new FileSinkConfig(ReadonlyConfig.fromConfig(config),
rowType));
}
@Test
@@ -76,7 +79,8 @@ public class FileSinkConfigTest {
new SeaTunnelDataType[] {
BasicType.STRING_TYPE, BasicType.INT_TYPE,
BasicType.STRING_TYPE
});
- FileSinkConfig fileSinkConfig = new FileSinkConfig(config,
seaTunnelRowTypeInfo);
+ FileSinkConfig fileSinkConfig =
+ new FileSinkConfig(ReadonlyConfig.fromConfig(config),
seaTunnelRowTypeInfo);
List<Integer> sinkColumnsIndexInRow =
fileSinkConfig.getSinkColumnsIndexInRow();
Assertions.assertEquals(
sinkColumnsIndexInRow.size(),
seaTunnelRowTypeInfo.getFieldNames().length);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
index 7dbc8bd9de..fa179d5097 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.writer;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -64,7 +63,7 @@ public class OrcWriteStrategyTest {
BasicType.STRING_TYPE,
});
FileSinkConfig writeSinkConfig =
- new FileSinkConfig(ConfigFactory.parseMap(writeConfig),
writeRowType);
+ new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig),
writeRowType);
OrcWriteStrategy writeStrategy = new OrcWriteStrategy(writeSinkConfig);
OrcReadStrategyTest.LocalConf hadoopConf =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
index e692d7294b..7fa6ced60d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.writer;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -79,7 +78,7 @@ public class ParquetWriteStrategyTest {
PrimitiveByteArrayType.INSTANCE
});
FileSinkConfig writeSinkConfig =
- new FileSinkConfig(ConfigFactory.parseMap(writeConfig),
writeRowType);
+ new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig),
writeRowType);
ParquetWriteStrategy writeStrategy = new
ParquetWriteStrategy(writeSinkConfig);
ParquetReadStrategyTest.LocalConf hadoopConf =
new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
index c85c755566..0f58007acd 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.cos.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.hadoop.fs.CosNConfigKeys;
@@ -57,4 +58,19 @@ public class CosConf extends HadoopConf {
hadoopConf.setExtraOptions(cosOptions);
return hadoopConf;
}
+
+ public static HadoopConf buildWithReadonlyConfig(ReadonlyConfig
readonlyConfig) {
+ HadoopConf hadoopConf = new
CosConf(readonlyConfig.get(CosFileBaseOptions.BUCKET));
+ HashMap<String, String> cosOptions = new HashMap<>();
+ cosOptions.put(
+ CosNConfigKeys.COSN_USERINFO_SECRET_ID_KEY,
+ readonlyConfig.get(CosFileBaseOptions.SECRET_ID));
+ cosOptions.put(
+ CosNConfigKeys.COSN_USERINFO_SECRET_KEY_KEY,
+ readonlyConfig.get(CosFileBaseOptions.SECRET_KEY));
+ cosOptions.put(
+ CosNConfigKeys.COSN_REGION_KEY,
readonlyConfig.get(CosFileBaseOptions.REGION));
+ hadoopConf.setExtraOptions(cosOptions);
+ return hadoopConf;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
index da1ed22197..eb7a875a91 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
@@ -17,56 +17,26 @@
package org.apache.seatunnel.connectors.seatunnel.file.cos.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosFileSinkOptions;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
-import com.google.auto.service.AutoService;
-
-import java.util.Optional;
-
-@AutoService(SeaTunnelSink.class)
public class CosFileSink extends BaseFileSink {
- @Override
- public String getPluginName() {
- return FileSystemType.COS.getFileSystemPluginName();
+
+ public CosFileSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable)
{
+ super(pluginConfig, catalogTable);
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- FileBaseOptions.FILE_PATH.key(),
- CosFileSinkOptions.REGION.key(),
- CosFileSinkOptions.SECRET_ID.key(),
- CosFileSinkOptions.SECRET_KEY.key(),
- CosFileSinkOptions.BUCKET.key());
- if (!result.isSuccess()) {
- throw new FileConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- hadoopConf = CosConf.buildWithConfig(pluginConfig);
+ protected HadoopConf initHadoopConf() {
+ return CosConf.buildWithReadonlyConfig(pluginConfig);
}
@Override
- public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ public String getPluginName() {
+ return FileSystemType.COS.getFileSystemPluginName();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
index bdf82d54f9..df4b78f833 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
@@ -18,8 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.file.cos.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
@@ -110,4 +112,9 @@ public class CosFileSinkFactory implements TableSinkFactory
{
.optional(FileBaseSinkOptions.TMP_PATH)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new CosFileSink(context.getOptions(),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/config/OssConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/config/OssConf.java
index 938a638484..8741087d31 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/config/OssConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/config/OssConf.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import java.util.HashMap;
@@ -55,4 +56,19 @@ public class OssConf extends HadoopConf {
hadoopConf.setExtraOptions(ossOptions);
return hadoopConf;
}
+
+ public static HadoopConf buildWithReadonlyConfig(ReadonlyConfig
readonlyConfig) {
+ HadoopConf hadoopConf = new
OssConf(readonlyConfig.get(OssFileBaseOptions.BUCKET));
+ HashMap<String, String> ossOptions = new HashMap<>();
+ ossOptions.put("fs.AbstractFileSystem.oss.impl",
"com.aliyun.emr.fs.oss.OSS");
+ ossOptions.put("fs.oss.impl",
"com.aliyun.emr.fs.oss.JindoOssFileSystem");
+ ossOptions.put("fs.oss.accessKeyId",
readonlyConfig.get(OssFileBaseOptions.ACCESS_KEY));
+ ossOptions.put(
+ "fs.oss.accessKeySecret",
readonlyConfig.get(OssFileBaseOptions.ACCESS_SECRET));
+ ossOptions.put("fs.oss.endpoint",
readonlyConfig.get(OssFileBaseOptions.ENDPOINT));
+ ossOptions.put("fs.oss.upload.thread.concurrency", "20");
+ ossOptions.put("fs.oss.upload.queue.size", "100");
+ hadoopConf.setExtraOptions(ossOptions);
+ return hadoopConf;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
index 2405e34db2..f644fcac3c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
@@ -17,56 +17,26 @@
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssFileSinkOptions;
-import
org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.exception.OssJindoConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
-import com.google.auto.service.AutoService;
-
-import java.util.Optional;
-
-@AutoService(SeaTunnelSink.class)
public class OssFileSink extends BaseFileSink {
- @Override
- public String getPluginName() {
- return FileSystemType.OSS_JINDO.getFileSystemPluginName();
+
+ public OssFileSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable)
{
+ super(pluginConfig, catalogTable);
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- FileBaseOptions.FILE_PATH.key(),
- OssFileSinkOptions.ENDPOINT.key(),
- OssFileSinkOptions.ACCESS_KEY.key(),
- OssFileSinkOptions.ACCESS_SECRET.key(),
- OssFileSinkOptions.BUCKET.key());
- if (!result.isSuccess()) {
- throw new OssJindoConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- hadoopConf = OssConf.buildWithConfig(pluginConfig);
+ protected HadoopConf initHadoopConf() {
+ return OssConf.buildWithReadonlyConfig(pluginConfig);
}
@Override
- public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ public String getPluginName() {
+ return FileSystemType.OSS_JINDO.getFileSystemPluginName();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
index 41ced858e8..a4088712c7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
@@ -18,8 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
@@ -110,4 +112,9 @@ public class OssFileSinkFactory implements TableSinkFactory
{
.optional(FileBaseSinkOptions.TMP_PATH)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new OssFileSink(context.getOptions(),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
index e331146e55..9ca85e10b4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.obs.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.hadoop.fs.obs.Constants;
@@ -53,4 +54,14 @@ public class ObsConf extends HadoopConf {
hadoopConf.setExtraOptions(ossOptions);
return hadoopConf;
}
+
+ public static HadoopConf buildWithReadonlyConfig(ReadonlyConfig
readonlyConfig) {
+ HadoopConf hadoopConf = new
ObsConf(readonlyConfig.get(ObsFileBaseOptions.BUCKET));
+ HashMap<String, String> ossOptions = new HashMap<>();
+ ossOptions.put(Constants.ACCESS_KEY,
readonlyConfig.get(ObsFileBaseOptions.ACCESS_KEY));
+ ossOptions.put(Constants.SECRET_KEY,
readonlyConfig.get(ObsFileBaseOptions.ACCESS_SECRET));
+ ossOptions.put(Constants.ENDPOINT,
readonlyConfig.get(ObsFileBaseOptions.ENDPOINT));
+ hadoopConf.setExtraOptions(ossOptions);
+ return hadoopConf;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
index 6212213200..bba854b1a9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
@@ -17,56 +17,26 @@
package org.apache.seatunnel.connectors.seatunnel.file.obs.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConf;
-import
org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsFileSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
-import com.google.auto.service.AutoService;
-
-import java.util.Optional;
-
-@AutoService(SeaTunnelSink.class)
public class ObsFileSink extends BaseFileSink {
- @Override
- public String getPluginName() {
- return FileSystemType.OBS.getFileSystemPluginName();
+
+ public ObsFileSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable)
{
+ super(pluginConfig, catalogTable);
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- FileBaseOptions.FILE_PATH.key(),
- ObsFileSinkOptions.BUCKET.key(),
- ObsFileSinkOptions.ACCESS_KEY.key(),
- ObsFileSinkOptions.ACCESS_SECRET.key(),
- ObsFileSinkOptions.BUCKET.key());
- if (!result.isSuccess()) {
- throw new FileConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- hadoopConf = ObsConf.buildWithConfig(pluginConfig);
+ protected HadoopConf initHadoopConf() {
+ return ObsConf.buildWithReadonlyConfig(pluginConfig);
}
@Override
- public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ public String getPluginName() {
+ return FileSystemType.OBS.getFileSystemPluginName();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
index a1cf8354ae..27c322e682 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
@@ -18,8 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.file.obs.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
@@ -92,4 +94,9 @@ public class ObsFileSinkFactory implements TableSinkFactory {
.optional(FileBaseSinkOptions.FILENAME_EXTENSION)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new ObsFileSink(context.getOptions(),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index d693e98c9b..772acef2ef 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -151,7 +151,8 @@ public class HiveSink
ConfigValueFactory.fromAnyRef(
getDefaultTableLocation(readonlyConfig)));
- return new FileSinkConfig(pluginConfig,
catalogTable.getSeaTunnelRowType());
+ return new FileSinkConfig(
+ ReadonlyConfig.fromConfig(pluginConfig),
catalogTable.getSeaTunnelRowType());
}
List<String> sinkFields =
@@ -220,7 +221,8 @@ public class HiveSink
ConfigValueFactory.fromAnyRef("${transactionId}"));
}
- return new FileSinkConfig(pluginConfig,
catalogTable.getSeaTunnelRowType());
+ return new FileSinkConfig(
+ ReadonlyConfig.fromConfig(pluginConfig),
catalogTable.getSeaTunnelRowType());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
index 19b48f72b4..2dfc1ab448 100644
---
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
+++
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
@@ -17,10 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.redshift;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
import java.sql.Connection;
@@ -36,7 +35,7 @@ public class RedshiftJdbcClient {
private final Connection connection;
- public static RedshiftJdbcClient getInstance(Config config)
+ public static RedshiftJdbcClient getInstance(ReadonlyConfig config)
throws S3RedshiftJdbcConnectorException {
if (INSTANCE == null) {
synchronized (RedshiftJdbcClient.class) {
@@ -45,10 +44,9 @@ public class RedshiftJdbcClient {
try {
INSTANCE =
new RedshiftJdbcClient(
-
config.getString(S3RedshiftConfigOptions.JDBC_URL.key()),
-
config.getString(S3RedshiftConfigOptions.JDBC_USER.key()),
- config.getString(
-
S3RedshiftConfigOptions.JDBC_PASSWORD.key()));
+
config.get(S3RedshiftSinkOptions.JDBC_URL),
+
config.get(S3RedshiftSinkOptions.JDBC_USER),
+
config.get(S3RedshiftSinkOptions.JDBC_PASSWORD));
} catch (SQLException | ClassNotFoundException e) {
throw new S3RedshiftJdbcConnectorException(
CommonErrorCodeDeprecated.SQL_OPERATION_FAILED,
diff --git
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
index 659282a924..a309b8c98c 100644
---
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
@@ -17,15 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.redshift.commit;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient;
-import
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
@@ -43,12 +43,12 @@ public class S3RedshiftSinkAggregatedCommitter extends
FileSinkAggregatedCommitt
private final String executeSql;
- private Config pluginConfig;
+ private final ReadonlyConfig pluginConfig;
- public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, Config
pluginConfig) {
+ public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf,
ReadonlyConfig pluginConfig) {
super(hadoopConf);
this.pluginConfig = pluginConfig;
- this.executeSql =
pluginConfig.getString(S3RedshiftConfigOptions.EXECUTE_SQL.key());
+ this.executeSql = pluginConfig.get(S3RedshiftSinkOptions.EXECUTE_SQL);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfigOptions.java
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftSinkOptions.java
similarity index 96%
rename from
seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfigOptions.java
rename to
seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftSinkOptions.java
index f1a8240703..8a83a8167d 100644
---
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfigOptions.java
+++
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftSinkOptions.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileBaseOptions;
-public class S3RedshiftConfigOptions extends S3FileBaseOptions {
+public class S3RedshiftSinkOptions extends S3FileBaseOptions {
public static final Option<String> JDBC_URL =
Options.key("jdbc_url")
diff --git
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
index 88bdeb9c7f..b56a9a9a7f 100644
---
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
+++
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
@@ -17,57 +17,32 @@
package org.apache.seatunnel.connectors.seatunnel.redshift.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
-import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileBaseOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.redshift.commit.S3RedshiftSinkAggregatedCommitter;
-import
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
-import
org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
-
-import com.google.auto.service.AutoService;
import java.util.Optional;
-@AutoService(SeaTunnelSink.class)
-public class S3RedshiftSink extends BaseHdfsFileSink {
+public class S3RedshiftSink extends BaseFileSink {
+
+ public S3RedshiftSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
+ super(pluginConfig, catalogTable);
+ }
@Override
- public String getPluginName() {
- return "S3Redshift";
+ protected HadoopConf initHadoopConf() {
+ return S3HadoopConf.buildWithReadOnlyConfig(pluginConfig);
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- S3FileBaseOptions.S3_BUCKET.key(),
- S3FileBaseOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
- S3RedshiftConfigOptions.JDBC_URL.key(),
- S3RedshiftConfigOptions.JDBC_USER.key(),
- S3RedshiftConfigOptions.JDBC_PASSWORD.key(),
- S3RedshiftConfigOptions.EXECUTE_SQL.key());
- if (!checkResult.isSuccess()) {
- throw new S3RedshiftJdbcConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
checkResult.getMsg()));
- }
- this.pluginConfig = pluginConfig;
- hadoopConf =
S3HadoopConf.buildWithReadOnlyConfig(ReadonlyConfig.fromConfig(pluginConfig));
+ public String getPluginName() {
+ return "S3Redshift";
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSinkFactory.java
similarity index 83%
rename from
seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
rename to
seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSinkFactory.java
index 816fef5c6e..bbd7f66cda 100644
---
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
+++
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSinkFactory.java
@@ -18,18 +18,20 @@
package org.apache.seatunnel.connectors.seatunnel.redshift.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileBaseOptions;
-import
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftSinkOptions;
import com.google.auto.service.AutoService;
@AutoService(Factory.class)
-public class S3RedshiftFactory implements TableSinkFactory {
+public class S3RedshiftSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
@@ -41,10 +43,10 @@ public class S3RedshiftFactory implements TableSinkFactory {
return OptionRule.builder()
.required(
S3FileBaseOptions.S3_BUCKET,
- S3RedshiftConfigOptions.JDBC_URL,
- S3RedshiftConfigOptions.JDBC_USER,
- S3RedshiftConfigOptions.JDBC_PASSWORD,
- S3RedshiftConfigOptions.EXECUTE_SQL,
+ S3RedshiftSinkOptions.JDBC_URL,
+ S3RedshiftSinkOptions.JDBC_USER,
+ S3RedshiftSinkOptions.JDBC_PASSWORD,
+ S3RedshiftSinkOptions.EXECUTE_SQL,
FileBaseSourceOptions.FILE_PATH,
S3FileBaseOptions.S3A_AWS_CREDENTIALS_PROVIDER)
.conditional(
@@ -71,4 +73,9 @@ public class S3RedshiftFactory implements TableSinkFactory {
.optional(FileBaseSinkOptions.FILE_NAME_EXPRESSION)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new S3RedshiftSink(context.getOptions(),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index e8722bb74b..674d69584c 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -30,6 +30,7 @@ import com.google.auto.service.AutoService;
import java.util.List;
+import static
org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@AutoService(Factory.class)
@@ -73,7 +74,8 @@ public class InMemorySinkFactory
CHECKPOINT_SLEEP,
THROW_EXCEPTION_OF_COMMITTER,
ASSERT_OPTIONS_KEY,
- ASSERT_OPTIONS_VALUE)
+ ASSERT_OPTIONS_VALUE,
+ MULTI_TABLE_SINK_REPLICA)
.build();
}