This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bb4f743c05 [Feature][Connector-File-Hadoop]Support multi table sink
feature for HdfsFile (#9651)
bb4f743c05 is described below
commit bb4f743c05428b7a148acec65770e742f4aa984a
Author: JeremyXin <[email protected]>
AuthorDate: Mon Aug 11 01:13:30 2025 +0800
[Feature][Connector-File-Hadoop]Support multi table sink feature for
HdfsFile (#9651)
---
.github/workflows/backend.yml | 2 +-
docs/en/connector-v2/sink/HdfsFile.md | 1 +
docs/zh/connector-v2/sink/HdfsFile.md | 1 +
.../seatunnel/sink/SinkFlowTestUtils.java | 35 ++-
.../connector-file/connector-file-hadoop/pom.xml | 7 +
.../HdfsFileSinkOptions.java} | 32 +--
.../seatunnel/file/hdfs/sink/HdfsFileSink.java | 27 +--
.../file/hdfs/sink/HdfsFileSinkFactory.java | 72 +++++-
.../seatunnel/file/hdfs/HdfsFileSinkTest.java | 266 +++++++++++++++++++++
9 files changed, 391 insertions(+), 52 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index d5e54feb82..04c21c395d 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -480,7 +480,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 150
+ timeout-minutes: 200
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/connector-v2/sink/HdfsFile.md
b/docs/en/connector-v2/sink/HdfsFile.md
index 3c72f4b2c2..e4cd56bf4e 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -17,6 +17,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
Use binary file format to read and write files in any format, such as
videos, pictures, etc. In short, any files can be synchronized to the target
place.
- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [support multiple table write](../../concept/connector-v2-features.md)
By default, we use 2PC commit to ensure `exactly-once`
diff --git a/docs/zh/connector-v2/sink/HdfsFile.md
b/docs/zh/connector-v2/sink/HdfsFile.md
index 8746a68328..384a20b7f4 100644
--- a/docs/zh/connector-v2/sink/HdfsFile.md
+++ b/docs/zh/connector-v2/sink/HdfsFile.md
@@ -17,6 +17,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
使用二进制文件格式读取和写入任何格式的文件,例如视频、图片等。简而言之,任何文件都可以同步到目标位置。
- [x] [精确一次](../../concept/connector-v2-features.md)
+- [x] [支持多表写入](../../concept/connector-v2-features.md)
默认情况下,我们使用2PC提交来确保"精确一次"
diff --git
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
index 40c440f924..982ad50f67 100644
---
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
+++
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
@@ -78,6 +78,19 @@ public class SinkFlowTestUtils {
runWithContext(catalogTable, options, factory, rows, context,
parallelism);
}
+ public static void runBatchWithMultiTableSink(
+ TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+ TableSinkFactoryContext tableSinkFactoryContext,
+ List<SeaTunnelRow> rows,
+ boolean checkpointEnabled,
+ int parallelism)
+ throws IOException {
+ JobContext context = new JobContext(System.currentTimeMillis());
+ context.setJobMode(JobMode.BATCH);
+ context.setEnableCheckpoint(checkpointEnabled);
+ runWithContext(factory, tableSinkFactoryContext, rows, context,
parallelism);
+ }
+
private static void runWithContext(
CatalogTable catalogTable,
ReadonlyConfig options,
@@ -86,13 +99,23 @@ public class SinkFlowTestUtils {
JobContext context,
int parallelism)
throws IOException {
+
+ TableSinkFactoryContext tableSinkFactoryContext =
+ new TableSinkFactoryContext(
+ catalogTable, options,
Thread.currentThread().getContextClassLoader());
+
+ runWithContext(factory, tableSinkFactoryContext, rows, context,
parallelism);
+ }
+
+ private static void runWithContext(
+ TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+ TableSinkFactoryContext tableSinkFactoryContext,
+ List<SeaTunnelRow> rows,
+ JobContext context,
+ int parallelism)
+ throws IOException {
SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink =
- factory.createSink(
- new TableSinkFactoryContext(
- catalogTable,
- options,
-
Thread.currentThread().getContextClassLoader()))
- .createSink();
+ factory.createSink(tableSinkFactoryContext).createSink();
sink.setJobContext(context);
List<Object> commitInfos = new ArrayList<>();
for (int i = 0; i < parallelism; i++) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
index 59da8bf844..8e3d641998 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
@@ -48,6 +48,13 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSinkOptions.java
similarity index 50%
copy from
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSinkOptions.java
index 5e098ea2d2..e53d07c6ac 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSinkOptions.java
@@ -15,34 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-
-import com.google.auto.service.AutoService;
-
-import java.util.Optional;
-
-@AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends BaseHdfsFileSink {
-
- @Override
- public String getPluginName() {
- return FileSystemType.HDFS.getFileSystemPluginName();
- }
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
- }
-
- @Override
- public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
- }
-}
+public class HdfsFileSinkOptions extends FileBaseSinkOptions {}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
index 5e098ea2d2..c019211c52 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
@@ -17,32 +17,31 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
import java.util.Optional;
-@AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends BaseHdfsFileSink {
+public class HdfsFileSink extends BaseMultipleTableFileSink {
- @Override
- public String getPluginName() {
- return FileSystemType.HDFS.getFileSystemPluginName();
+ private final CatalogTable catalogTable;
+
+ public HdfsFileSink(
+ HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable
catalogTable) {
+ super(hadoopConf, readonlyConfig, catalogTable);
+ this.catalogTable = catalogTable;
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
+ public String getPluginName() {
+ return FileSystemType.HDFS.getFileSystemPluginName();
}
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.ofNullable(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index e7ecd250d2..600a9dff7c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -17,20 +17,39 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import com.google.auto.service.AutoService;
import java.util.Arrays;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+
@AutoService(Factory.class)
-public class HdfsFileSinkFactory implements TableSinkFactory {
+public class HdfsFileSinkFactory extends BaseMultipleTableFileSinkFactory {
@Override
public String factoryIdentifier() {
return FileSystemType.HDFS.getFileSystemPluginName();
@@ -42,6 +61,7 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
.required(HdfsSourceConfigOptions.DEFAULT_FS)
.required(FileBaseSinkOptions.FILE_PATH)
.optional(FileBaseSinkOptions.FILE_FORMAT_TYPE)
+ .optional(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
FileBaseSinkOptions.FILE_FORMAT_TYPE,
FileFormat.TEXT,
@@ -111,4 +131,52 @@ public class HdfsFileSinkFactory implements
TableSinkFactory {
.optional(FileBaseSinkOptions.TMP_PATH)
.build();
}
+
+ @Override
+ public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo,
FileAggregatedCommitInfo>
+ createSink(TableSinkFactoryContext context) {
+ ReadonlyConfig readonlyConfig = context.getOptions();
+ CatalogTable catalogTable = context.getCatalogTable();
+ HadoopConf hadoopConf = initHadoopConf(readonlyConfig);
+ return () -> new HdfsFileSink(hadoopConf, readonlyConfig,
catalogTable);
+ }
+
+ public HadoopConf initHadoopConf(ReadonlyConfig readonlyConfig) {
+ Config pluginConfig = readonlyConfig.toConfig();
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(readonlyConfig.toConfig(),
FS_DEFAULT_NAME_KEY);
+ if (!result.isSuccess()) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ factoryIdentifier(), PluginType.SINK,
result.getMsg()));
+ }
+
+ HadoopConf hadoopConf = new
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
+
+ if (pluginConfig.hasPath(FileBaseSinkOptions.HDFS_SITE_PATH.key())) {
+ hadoopConf.setHdfsSitePath(
+
pluginConfig.getString(FileBaseSinkOptions.HDFS_SITE_PATH.key()));
+ }
+
+ if (pluginConfig.hasPath(FileBaseSinkOptions.REMOTE_USER.key())) {
+
hadoopConf.setRemoteUser(pluginConfig.getString(FileBaseSinkOptions.REMOTE_USER.key()));
+ }
+
+ if (pluginConfig.hasPath(FileBaseSinkOptions.KRB5_PATH.key())) {
+
hadoopConf.setKrb5Path(pluginConfig.getString(FileBaseSinkOptions.KRB5_PATH.key()));
+ }
+
+ if
(pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key())) {
+ hadoopConf.setKerberosPrincipal(
+
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key()));
+ }
+ if
(pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key())) {
+ hadoopConf.setKerberosKeytabPath(
+
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key()));
+ }
+
+ return hadoopConf;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSinkTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSinkTest.java
new file mode 100644
index 0000000000..a4cd34a54f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSinkTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkFactory;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.HdfsFileSink;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.HdfsFileSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@DisabledOnOs(value = OS.WINDOWS)
+public class HdfsFileSinkTest {
+ private static final String ROW_NAME = "name";
+ private static final String ROW_AGE = "age";
+ private static final String FS_TARGET_PATH =
"file:///tmp/seatunnel/hdfs_file_sink_test";
+ private static final String FS_MULTI_TABLE_SINK_PATH =
+ "file:///tmp/seatunnel/hdfs_multi_table_sink_test";
+ private static final String DEFAULT_FS = "file:///";
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("catalog", "database", "table"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ ROW_NAME, BasicType.STRING_TYPE,
1L, true, null, ""))
+ .column(
+ PhysicalColumn.of(
+ ROW_AGE, BasicType.INT_TYPE, 1L,
true, null, ""))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "comment");
+
+ @Test
+ public void testHdfsFileSinkWithTextFormat() throws Exception {
+ Map<String, Object> config = createBasicConfig();
+ config.put(FileBaseSinkOptions.FILE_FORMAT_TYPE.key(),
FileFormat.TEXT.toString());
+ config.put(FileBaseSinkOptions.FIELD_DELIMITER.key(), ",");
+
+ List<SeaTunnelRow> rows = createTestRows();
+
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ catalogTable, ReadonlyConfig.fromMap(config), new
HdfsFileSinkFactory(), rows);
+
+ Path resultPath = new Path(FS_TARGET_PATH);
+ FileSystem fs = resultPath.getFileSystem(new Configuration());
+
+ FileStatus[] fileStatuses =
+ fs.listStatus(resultPath, path ->
path.getName().endsWith(".txt"));
+
+ Assertions.assertTrue(fileStatuses.length > 0);
+
+ List<String> readData = readFileContent(fileStatuses[0].getPath(), fs);
+
+ Assertions.assertEquals("Alice,18", readData.get(0));
+ Assertions.assertEquals("Bob,20", readData.get(1));
+
+ fs.delete(new Path(FS_TARGET_PATH), true);
+ }
+
+ @Test
+ public void testHdfsFileSinkWithParquetFormat() throws Exception {
+ Map<String, Object> config = createBasicConfig();
+ config.put(FileBaseSinkOptions.FILE_FORMAT_TYPE.key(),
FileFormat.PARQUET.toString());
+
+ List<SeaTunnelRow> rows = createTestRows();
+
+ FileUtils.deleteDirectory(new File(FS_TARGET_PATH));
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ catalogTable, ReadonlyConfig.fromMap(config), new
HdfsFileSinkFactory(), rows);
+
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set("fs.defaultFS", "file:///");
+ FileSystem fileSystem = FileSystem.get(hadoopConf);
+
+ Path outputPath = new Path(FS_TARGET_PATH);
+ FileStatus[] fileStatuses = fileSystem.listStatus(outputPath);
+
+ Path parquetFile = null;
+ for (FileStatus status : fileStatuses) {
+ if (!status.isDirectory() &&
status.getPath().getName().endsWith(".parquet")) {
+ parquetFile = status.getPath();
+ break;
+ }
+ }
+
+ Assertions.assertNotNull(parquetFile);
+
+ ParquetReader<GenericRecord> reader =
+
AvroParquetReader.<GenericRecord>builder(parquetFile).withConf(hadoopConf).build();
+
+ GenericRecord record;
+ int recordCount = 0;
+ while ((record = reader.read()) != null) {
+ recordCount++;
+ if (recordCount == 1) {
+ Assertions.assertEquals("Alice",
record.get(ROW_NAME).toString());
+ Assertions.assertEquals(18, record.get(ROW_AGE));
+ } else if (recordCount == 2) {
+ Assertions.assertEquals("Bob",
record.get(ROW_NAME).toString());
+ Assertions.assertEquals(20, record.get(ROW_AGE));
+ }
+ }
+
+ Assertions.assertEquals(2, recordCount);
+ reader.close();
+
+ fileSystem.delete(new Path(FS_TARGET_PATH), true);
+ }
+
+ @Test
+ public void testTextFormatWithMultiTableSink() throws Exception {
+ String table1Path = FS_MULTI_TABLE_SINK_PATH + "/table1";
+ String table2Path = FS_MULTI_TABLE_SINK_PATH + "/table2";
+
+ Map<String, Object> basicConfig = createBasicConfig();
+ basicConfig.put(FileBaseSinkOptions.FILE_FORMAT_TYPE.key(),
FileFormat.TEXT.toString());
+ basicConfig.put(FileBaseSinkOptions.FIELD_DELIMITER.key(), ",");
+
+ Map<String, Object> table1Options = new HashMap<>(basicConfig);
+ table1Options.put(FileBaseSinkOptions.FILE_PATH.key(), table1Path);
+
+ Map<String, Object> table2Options = new HashMap<>(basicConfig);
+ table2Options.put(FileBaseSinkOptions.FILE_PATH.key(), table2Path);
+
+ TablePath tablePath1 = TablePath.of("test.table1");
+ TablePath tablePath2 = TablePath.of("test.table2");
+
+ HadoopConf hadoopConf = new HadoopConf(DEFAULT_FS);
+
+ // create multi sink
+ HdfsFileSink sink1 =
+ new HdfsFileSink(hadoopConf,
ReadonlyConfig.fromMap(table1Options), catalogTable);
+ HdfsFileSink sink2 =
+ new HdfsFileSink(hadoopConf,
ReadonlyConfig.fromMap(table2Options), catalogTable);
+
+ Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
+ sinks.put(tablePath1, sink1);
+ sinks.put(tablePath2, sink2);
+
+ // create multi table factory context
+
basicConfig.put(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA.key(), 1);
+ MultiTableFactoryContext multiTableContext =
+ new MultiTableFactoryContext(
+ ReadonlyConfig.fromMap(basicConfig),
getClass().getClassLoader(), sinks);
+
+ // create test rows
+ List<SeaTunnelRow> rows = createTestRows();
+
+ // run multi table sink
+ SinkFlowTestUtils.runBatchWithMultiTableSink(
+ new MultiTableSinkFactory(), multiTableContext, rows, false,
1);
+
+ FileSystem fs = FileSystem.get(new Configuration());
+
+ FileStatus[] fileStatuses1 = fs.listStatus(new Path(table1Path));
+ FileStatus[] fileStatuses2 = fs.listStatus(new Path(table2Path));
+
+ Assertions.assertTrue(fileStatuses1.length > 0);
+ Assertions.assertTrue(fileStatuses2.length > 0);
+
+ List<String> readDataTable1 =
readFileContent(fileStatuses1[0].getPath(), fs);
+ List<String> readDataTable2 =
readFileContent(fileStatuses2[0].getPath(), fs);
+
+ Assertions.assertEquals("Alice,18", readDataTable1.get(0));
+ Assertions.assertEquals("Bob,20", readDataTable2.get(0));
+
+ fs.delete(new Path(FS_MULTI_TABLE_SINK_PATH), true);
+ }
+
+ private Map<String, Object> createBasicConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+ config.put(FileBaseSinkOptions.FILE_PATH.key(), FS_TARGET_PATH);
+ config.put(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key(), false);
+ config.put(FileBaseSinkOptions.HAVE_PARTITION.key(), false);
+ config.put(FileBaseSinkOptions.ENCODING.key(), "UTF-8");
+ return config;
+ }
+
+ private List<String> readFileContent(Path path, FileSystem fs) throws
Exception {
+ List<String> data = new ArrayList<>();
+ try (FSDataInputStream inputStream = fs.open(path);
+ BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(inputStream,
StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ data.add(line);
+ }
+ }
+
+ return data;
+ }
+
+ private List<SeaTunnelRow> createTestRows() {
+ List<SeaTunnelRow> rows = new ArrayList<>();
+
+ // create first record
+ SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {"Alice", 18});
+ row1.setTableId("test.table1");
+ rows.add(row1);
+
+ // create second record
+ SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {"Bob", 20});
+ row2.setTableId("test.table2");
+ rows.add(row2);
+
+ return rows;
+ }
+}