This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bb4f743c05 [Feature][Connector-File-Hadoop]Support multi table sink 
feature for HdfsFile (#9651)
bb4f743c05 is described below

commit bb4f743c05428b7a148acec65770e742f4aa984a
Author: JeremyXin <[email protected]>
AuthorDate: Mon Aug 11 01:13:30 2025 +0800

    [Feature][Connector-File-Hadoop]Support multi table sink feature for 
HdfsFile (#9651)
---
 .github/workflows/backend.yml                      |   2 +-
 docs/en/connector-v2/sink/HdfsFile.md              |   1 +
 docs/zh/connector-v2/sink/HdfsFile.md              |   1 +
 .../seatunnel/sink/SinkFlowTestUtils.java          |  35 ++-
 .../connector-file/connector-file-hadoop/pom.xml   |   7 +
 .../HdfsFileSinkOptions.java}                      |  32 +--
 .../seatunnel/file/hdfs/sink/HdfsFileSink.java     |  27 +--
 .../file/hdfs/sink/HdfsFileSinkFactory.java        |  72 +++++-
 .../seatunnel/file/hdfs/HdfsFileSinkTest.java      | 266 +++++++++++++++++++++
 9 files changed, 391 insertions(+), 52 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index d5e54feb82..04c21c395d 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -480,7 +480,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 150
+    timeout-minutes: 200
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git a/docs/en/connector-v2/sink/HdfsFile.md 
b/docs/en/connector-v2/sink/HdfsFile.md
index 3c72f4b2c2..e4cd56bf4e 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -17,6 +17,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
   Use binary file format to read and write files in any format, such as 
videos, pictures, etc. In short, any files can be synchronized to the target 
place.
 
 - [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [support multiple table write](../../concept/connector-v2-features.md)
 
 By default, we use 2PC commit to ensure `exactly-once`
 
diff --git a/docs/zh/connector-v2/sink/HdfsFile.md 
b/docs/zh/connector-v2/sink/HdfsFile.md
index 8746a68328..384a20b7f4 100644
--- a/docs/zh/connector-v2/sink/HdfsFile.md
+++ b/docs/zh/connector-v2/sink/HdfsFile.md
@@ -17,6 +17,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
   使用二进制文件格式读取和写入任何格式的文件,例如视频、图片等。简而言之,任何文件都可以同步到目标位置。
 
 - [x] [精确一次](../../concept/connector-v2-features.md)
+- [x] [支持多表写入](../../concept/connector-v2-features.md)
 
   默认情况下,我们使用2PC提交来确保"精确一次"
 
diff --git 
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
 
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
index 40c440f924..982ad50f67 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
@@ -78,6 +78,19 @@ public class SinkFlowTestUtils {
         runWithContext(catalogTable, options, factory, rows, context, 
parallelism);
     }
 
+    public static void runBatchWithMultiTableSink(
+            TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+            TableSinkFactoryContext tableSinkFactoryContext,
+            List<SeaTunnelRow> rows,
+            boolean checkpointEnabled,
+            int parallelism)
+            throws IOException {
+        JobContext context = new JobContext(System.currentTimeMillis());
+        context.setJobMode(JobMode.BATCH);
+        context.setEnableCheckpoint(checkpointEnabled);
+        runWithContext(factory, tableSinkFactoryContext, rows, context, 
parallelism);
+    }
+
     private static void runWithContext(
             CatalogTable catalogTable,
             ReadonlyConfig options,
@@ -86,13 +99,23 @@ public class SinkFlowTestUtils {
             JobContext context,
             int parallelism)
             throws IOException {
+
+        TableSinkFactoryContext tableSinkFactoryContext =
+                new TableSinkFactoryContext(
+                        catalogTable, options, 
Thread.currentThread().getContextClassLoader());
+
+        runWithContext(factory, tableSinkFactoryContext, rows, context, 
parallelism);
+    }
+
+    private static void runWithContext(
+            TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory,
+            TableSinkFactoryContext tableSinkFactoryContext,
+            List<SeaTunnelRow> rows,
+            JobContext context,
+            int parallelism)
+            throws IOException {
         SeaTunnelSink<SeaTunnelRow, ?, ?, ?> sink =
-                factory.createSink(
-                                new TableSinkFactoryContext(
-                                        catalogTable,
-                                        options,
-                                        
Thread.currentThread().getContextClassLoader()))
-                        .createSink();
+                factory.createSink(tableSinkFactoryContext).createSink();
         sink.setJobContext(context);
         List<Object> commitInfos = new ArrayList<>();
         for (int i = 0; i < parallelism; i++) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
index 59da8bf844..8e3d641998 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
@@ -48,6 +48,13 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSinkOptions.java
similarity index 50%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSinkOptions.java
index 5e098ea2d2..e53d07c6ac 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSinkOptions.java
@@ -15,34 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-
-import com.google.auto.service.AutoService;
-
-import java.util.Optional;
-
-@AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends BaseHdfsFileSink {
-
-    @Override
-    public String getPluginName() {
-        return FileSystemType.HDFS.getFileSystemPluginName();
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
-    }
-
-    @Override
-    public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
-    }
-}
+public class HdfsFileSinkOptions extends FileBaseSinkOptions {}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
index 5e098ea2d2..c019211c52 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSink.java
@@ -17,32 +17,31 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
 
 import java.util.Optional;
 
-@AutoService(SeaTunnelSink.class)
-public class HdfsFileSink extends BaseHdfsFileSink {
+public class HdfsFileSink extends BaseMultipleTableFileSink {
 
-    @Override
-    public String getPluginName() {
-        return FileSystemType.HDFS.getFileSystemPluginName();
+    private final CatalogTable catalogTable;
+
+    public HdfsFileSink(
+            HadoopConf hadoopConf, ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
+        super(hadoopConf, readonlyConfig, catalogTable);
+        this.catalogTable = catalogTable;
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
+    public String getPluginName() {
+        return FileSystemType.HDFS.getFileSystemPluginName();
     }
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.ofNullable(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index e7ecd250d2..600a9dff7c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -17,20 +17,39 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
 
 import com.google.auto.service.AutoService;
 
 import java.util.Arrays;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+
 @AutoService(Factory.class)
-public class HdfsFileSinkFactory implements TableSinkFactory {
+public class HdfsFileSinkFactory extends BaseMultipleTableFileSinkFactory {
     @Override
     public String factoryIdentifier() {
         return FileSystemType.HDFS.getFileSystemPluginName();
@@ -42,6 +61,7 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
                 .required(HdfsSourceConfigOptions.DEFAULT_FS)
                 .required(FileBaseSinkOptions.FILE_PATH)
                 .optional(FileBaseSinkOptions.FILE_FORMAT_TYPE)
+                .optional(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .conditional(
                         FileBaseSinkOptions.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
@@ -111,4 +131,52 @@ public class HdfsFileSinkFactory implements 
TableSinkFactory {
                 .optional(FileBaseSinkOptions.TMP_PATH)
                 .build();
     }
+
+    @Override
+    public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>
+            createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+        HadoopConf hadoopConf = initHadoopConf(readonlyConfig);
+        return () -> new HdfsFileSink(hadoopConf, readonlyConfig, 
catalogTable);
+    }
+
+    public HadoopConf initHadoopConf(ReadonlyConfig readonlyConfig) {
+        Config pluginConfig = readonlyConfig.toConfig();
+        CheckResult result =
+                CheckConfigUtil.checkAllExists(readonlyConfig.toConfig(), 
FS_DEFAULT_NAME_KEY);
+        if (!result.isSuccess()) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            factoryIdentifier(), PluginType.SINK, 
result.getMsg()));
+        }
+
+        HadoopConf hadoopConf = new 
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
+
+        if (pluginConfig.hasPath(FileBaseSinkOptions.HDFS_SITE_PATH.key())) {
+            hadoopConf.setHdfsSitePath(
+                    
pluginConfig.getString(FileBaseSinkOptions.HDFS_SITE_PATH.key()));
+        }
+
+        if (pluginConfig.hasPath(FileBaseSinkOptions.REMOTE_USER.key())) {
+            
hadoopConf.setRemoteUser(pluginConfig.getString(FileBaseSinkOptions.REMOTE_USER.key()));
+        }
+
+        if (pluginConfig.hasPath(FileBaseSinkOptions.KRB5_PATH.key())) {
+            
hadoopConf.setKrb5Path(pluginConfig.getString(FileBaseSinkOptions.KRB5_PATH.key()));
+        }
+
+        if 
(pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key())) {
+            hadoopConf.setKerberosPrincipal(
+                    
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key()));
+        }
+        if 
(pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key())) {
+            hadoopConf.setKerberosKeytabPath(
+                    
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key()));
+        }
+
+        return hadoopConf;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSinkTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSinkTest.java
new file mode 100644
index 0000000000..a4cd34a54f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSinkTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkFactory;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.HdfsFileSink;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.HdfsFileSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@DisabledOnOs(value = OS.WINDOWS)
+public class HdfsFileSinkTest {
+    private static final String ROW_NAME = "name";
+    private static final String ROW_AGE = "age";
+    private static final String FS_TARGET_PATH = 
"file:///tmp/seatunnel/hdfs_file_sink_test";
+    private static final String FS_MULTI_TABLE_SINK_PATH =
+            "file:///tmp/seatunnel/hdfs_multi_table_sink_test";
+    private static final String DEFAULT_FS = "file:///";
+
+    CatalogTable catalogTable =
+            CatalogTable.of(
+                    TableIdentifier.of("catalog", "database", "table"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            ROW_NAME, BasicType.STRING_TYPE, 
1L, true, null, ""))
+                            .column(
+                                    PhysicalColumn.of(
+                                            ROW_AGE, BasicType.INT_TYPE, 1L, 
true, null, ""))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.emptyList(),
+                    "comment");
+
+    @Test
+    public void testHdfsFileSinkWithTextFormat() throws Exception {
+        Map<String, Object> config = createBasicConfig();
+        config.put(FileBaseSinkOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.TEXT.toString());
+        config.put(FileBaseSinkOptions.FIELD_DELIMITER.key(), ",");
+
+        List<SeaTunnelRow> rows = createTestRows();
+
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                catalogTable, ReadonlyConfig.fromMap(config), new 
HdfsFileSinkFactory(), rows);
+
+        Path resultPath = new Path(FS_TARGET_PATH);
+        FileSystem fs = resultPath.getFileSystem(new Configuration());
+
+        FileStatus[] fileStatuses =
+                fs.listStatus(resultPath, path -> 
path.getName().endsWith(".txt"));
+
+        Assertions.assertTrue(fileStatuses.length > 0);
+
+        List<String> readData = readFileContent(fileStatuses[0].getPath(), fs);
+
+        Assertions.assertEquals("Alice,18", readData.get(0));
+        Assertions.assertEquals("Bob,20", readData.get(1));
+
+        fs.delete(new Path(FS_TARGET_PATH), true);
+    }
+
+    @Test
+    public void testHdfsFileSinkWithParquetFormat() throws Exception {
+        Map<String, Object> config = createBasicConfig();
+        config.put(FileBaseSinkOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.PARQUET.toString());
+
+        List<SeaTunnelRow> rows = createTestRows();
+
+        FileUtils.deleteDirectory(new File(FS_TARGET_PATH));
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                catalogTable, ReadonlyConfig.fromMap(config), new 
HdfsFileSinkFactory(), rows);
+
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.set("fs.defaultFS", "file:///");
+        FileSystem fileSystem = FileSystem.get(hadoopConf);
+
+        Path outputPath = new Path(FS_TARGET_PATH);
+        FileStatus[] fileStatuses = fileSystem.listStatus(outputPath);
+
+        Path parquetFile = null;
+        for (FileStatus status : fileStatuses) {
+            if (!status.isDirectory() && 
status.getPath().getName().endsWith(".parquet")) {
+                parquetFile = status.getPath();
+                break;
+            }
+        }
+
+        Assertions.assertNotNull(parquetFile);
+
+        ParquetReader<GenericRecord> reader =
+                
AvroParquetReader.<GenericRecord>builder(parquetFile).withConf(hadoopConf).build();
+
+        GenericRecord record;
+        int recordCount = 0;
+        while ((record = reader.read()) != null) {
+            recordCount++;
+            if (recordCount == 1) {
+                Assertions.assertEquals("Alice", 
record.get(ROW_NAME).toString());
+                Assertions.assertEquals(18, record.get(ROW_AGE));
+            } else if (recordCount == 2) {
+                Assertions.assertEquals("Bob", 
record.get(ROW_NAME).toString());
+                Assertions.assertEquals(20, record.get(ROW_AGE));
+            }
+        }
+
+        Assertions.assertEquals(2, recordCount);
+        reader.close();
+
+        fileSystem.delete(new Path(FS_TARGET_PATH), true);
+    }
+
+    @Test
+    public void testTextFormatWithMultiTableSink() throws Exception {
+        String table1Path = FS_MULTI_TABLE_SINK_PATH + "/table1";
+        String table2Path = FS_MULTI_TABLE_SINK_PATH + "/table2";
+
+        Map<String, Object> basicConfig = createBasicConfig();
+        basicConfig.put(FileBaseSinkOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.TEXT.toString());
+        basicConfig.put(FileBaseSinkOptions.FIELD_DELIMITER.key(), ",");
+
+        Map<String, Object> table1Options = new HashMap<>(basicConfig);
+        table1Options.put(FileBaseSinkOptions.FILE_PATH.key(), table1Path);
+
+        Map<String, Object> table2Options = new HashMap<>(basicConfig);
+        table2Options.put(FileBaseSinkOptions.FILE_PATH.key(), table2Path);
+
+        TablePath tablePath1 = TablePath.of("test.table1");
+        TablePath tablePath2 = TablePath.of("test.table2");
+
+        HadoopConf hadoopConf = new HadoopConf(DEFAULT_FS);
+
+        // create multi sink
+        HdfsFileSink sink1 =
+                new HdfsFileSink(hadoopConf, 
ReadonlyConfig.fromMap(table1Options), catalogTable);
+        HdfsFileSink sink2 =
+                new HdfsFileSink(hadoopConf, 
ReadonlyConfig.fromMap(table2Options), catalogTable);
+
+        Map<TablePath, SeaTunnelSink> sinks = new HashMap<>();
+        sinks.put(tablePath1, sink1);
+        sinks.put(tablePath2, sink2);
+
+        // create multi table factory context
+        
basicConfig.put(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA.key(), 1);
+        MultiTableFactoryContext multiTableContext =
+                new MultiTableFactoryContext(
+                        ReadonlyConfig.fromMap(basicConfig), 
getClass().getClassLoader(), sinks);
+
+        // create test rows
+        List<SeaTunnelRow> rows = createTestRows();
+
+        // run multi table sink
+        SinkFlowTestUtils.runBatchWithMultiTableSink(
+                new MultiTableSinkFactory(), multiTableContext, rows, false, 
1);
+
+        FileSystem fs = FileSystem.get(new Configuration());
+
+        FileStatus[] fileStatuses1 = fs.listStatus(new Path(table1Path));
+        FileStatus[] fileStatuses2 = fs.listStatus(new Path(table2Path));
+
+        Assertions.assertTrue(fileStatuses1.length > 0);
+        Assertions.assertTrue(fileStatuses2.length > 0);
+
+        List<String> readDataTable1 = 
readFileContent(fileStatuses1[0].getPath(), fs);
+        List<String> readDataTable2 = 
readFileContent(fileStatuses2[0].getPath(), fs);
+
+        Assertions.assertEquals("Alice,18", readDataTable1.get(0));
+        Assertions.assertEquals("Bob,20", readDataTable2.get(0));
+
+        fs.delete(new Path(FS_MULTI_TABLE_SINK_PATH), true);
+    }
+
+    private Map<String, Object> createBasicConfig() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+        config.put(FileBaseSinkOptions.FILE_PATH.key(), FS_TARGET_PATH);
+        config.put(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key(), false);
+        config.put(FileBaseSinkOptions.HAVE_PARTITION.key(), false);
+        config.put(FileBaseSinkOptions.ENCODING.key(), "UTF-8");
+        return config;
+    }
+
+    private List<String> readFileContent(Path path, FileSystem fs) throws 
Exception {
+        List<String> data = new ArrayList<>();
+        try (FSDataInputStream inputStream = fs.open(path);
+                BufferedReader reader =
+                        new BufferedReader(
+                                new InputStreamReader(inputStream, 
StandardCharsets.UTF_8))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                data.add(line);
+            }
+        }
+
+        return data;
+    }
+
+    private List<SeaTunnelRow> createTestRows() {
+        List<SeaTunnelRow> rows = new ArrayList<>();
+
+        // create first record
+        SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {"Alice", 18});
+        row1.setTableId("test.table1");
+        rows.add(row1);
+
+        // create second record
+        SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {"Bob", 20});
+        row2.setTableId("test.table2");
+        rows.add(row2);
+
+        return rows;
+    }
+}

Reply via email to