This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 672af255ef [Feature][Connector-V2] Support hdfs file multi table 
source read (#9816)
672af255ef is described below

commit 672af255efcc239f5c3760d7bbc09a45e5676006
Author: JeremyXin <[email protected]>
AuthorDate: Mon Sep 8 18:35:39 2025 +0800

    [Feature][Connector-V2] Support hdfs file multi table source read (#9816)
---
 docs/en/connector-v2/source/HdfsFile.md            |  41 ++++
 docs/zh/connector-v2/source/HdfsFile.md            |  41 ++++
 .../HdfsFileCatalog.java}                          |  25 +--
 .../file/hdfs/catalog/HdfsFileCatalogFactory.java  |  53 +++++
 .../file/hdfs/config/HdfsFileHadoopConfig.java     |  78 +++++++
 .../HdfsFileSourceConfig.java}                     |  24 +-
 .../MultipleTableHdfsFileSourceConfig.java}        |  25 +--
 .../seatunnel/file/hdfs/source/HdfsFileSource.java |  19 +-
 .../file/hdfs/source/HdfsFileSourceFactory.java    |  17 +-
 .../file/hdfs/HdfsFileSourceConfigTest.java        | 248 +++++++++++++++++++++
 10 files changed, 510 insertions(+), 61 deletions(-)

diff --git a/docs/en/connector-v2/source/HdfsFile.md 
b/docs/en/connector-v2/source/HdfsFile.md
index fbf8632b47..ca3dfb65b6 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -25,6 +25,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
 - [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
+- [x] [support multiple table read](../../concept/connector-v2-features.md)
 - [x] file format file
   - [x] text
   - [x] csv
@@ -261,6 +262,46 @@ sink {
 }
 ```
 
+### Multiple Table
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  HdfsFile {
+    tables_configs = [
+      {
+        schema = {
+          table = "student"
+        }
+        path = "/apps/hive/demo/student"
+        file_format_type = "json"
+        fs.defaultFS = "hdfs://namenode001"
+      },
+      {
+        schema = {
+          table = "teacher"
+        }
+        path = "/apps/hive/demo/teacher"
+        file_format_type = "json"
+        fs.defaultFS = "hdfs://namenode001"
+      }
+    ]
+  }
+}
+
+sink {
+    HdfsFile {
+      fs.defaultFS = "hdfs://hadoopcluster"
+      path = "/tmp/hive/warehouse/${table_name}"
+      file_format_type = "orc"
+    }
+}
+
+```
+
 ## Changelog
 
 <ChangeLog />
\ No newline at end of file
diff --git a/docs/zh/connector-v2/source/HdfsFile.md 
b/docs/zh/connector-v2/source/HdfsFile.md
index 9518acadca..6b1ce1b73e 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -25,6 +25,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
 - [x] [列投影](../../concept/connector-v2-features.md)
 - [x] [并行度](../../concept/connector-v2-features.md)
 - [ ] [支持用户定义分片](../../concept/connector-v2-features.md)
+- [x] [支持多表读](../../concept/connector-v2-features.md)
 - [x] 文件格式类型
   - [x] text
   - [x] csv
@@ -262,6 +263,46 @@ sink {
 }
 ```
 
+### 多表配置
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  HdfsFile {
+    tables_configs = [
+      {
+        schema = {
+          table = "student"
+        }
+        path = "/apps/hive/demo/student"
+        file_format_type = "json"
+        fs.defaultFS = "hdfs://namenode001"
+      },
+      {
+        schema = {
+          table = "teacher"
+        }
+        path = "/apps/hive/demo/teacher"
+        file_format_type = "json"
+        fs.defaultFS = "hdfs://namenode001"
+      }
+    ]
+  }
+}
+
+sink {
+    HdfsFile {
+      fs.defaultFS = "hdfs://hadoopcluster"
+      path = "/tmp/hive/warehouse/${table_name}"
+      file_format_type = "orc"
+    }
+}
+
+```
+
 ## 变更日志
 
 <ChangeLog />
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalog.java
similarity index 53%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalog.java
index 1e73a5541b..d2fc65af78 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalog.java
@@ -15,26 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.catalog;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import 
org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+public class HdfsFileCatalog extends AbstractFileCatalog {
 
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseHdfsFileSource {
-
-    @Override
-    public String getPluginName() {
-        return FileSystemType.HDFS.getFileSystemPluginName();
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
+    protected HdfsFileCatalog(
+            HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, 
String catalogName) {
+        super(hadoopFileSystemProxy, filePath, catalogName);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalogFactory.java
new file mode 100644
index 0000000000..c0fbb76f04
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalogFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileHadoopConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HdfsFileCatalogFactory implements CatalogFactory {
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        HadoopFileSystemProxy hadoopFileSystemProxy =
+                new 
HadoopFileSystemProxy(HdfsFileHadoopConfig.buildWithConfig(options));
+        return new HdfsFileCatalog(
+                hadoopFileSystemProxy,
+                options.get(HdfsSourceConfigOptions.FILE_PATH),
+                factoryIdentifier());
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FileSystemType.HDFS.getFileSystemPluginName();
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileHadoopConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileHadoopConfig.java
new file mode 100644
index 0000000000..23d325575f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileHadoopConfig.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+
+public class HdfsFileHadoopConfig extends HadoopConf {
+    public HdfsFileHadoopConfig(String hdfsNameKey) {
+        super(hdfsNameKey);
+    }
+
+    public static HadoopConf buildWithConfig(ReadonlyConfig readonlyConfig) {
+        CheckResult result =
+                CheckConfigUtil.checkAllExists(
+                        readonlyConfig.toConfig(),
+                        HdfsSourceConfigOptions.FILE_PATH.key(),
+                        HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(),
+                        HdfsSourceConfigOptions.DEFAULT_FS.key());
+        if (!result.isSuccess()) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            FileSystemType.HDFS.getFileSystemPluginName(),
+                            PluginType.SOURCE,
+                            result.getMsg()));
+        }
+        HadoopConf hadoopConf =
+                new 
HdfsFileHadoopConfig(readonlyConfig.get(HdfsSourceConfigOptions.DEFAULT_FS));
+
+        if 
(readonlyConfig.getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH).isPresent())
 {
+            
hadoopConf.setHdfsSitePath(readonlyConfig.get(HdfsSourceConfigOptions.HDFS_SITE_PATH));
+        }
+
+        if 
(readonlyConfig.getOptional(HdfsSourceConfigOptions.REMOTE_USER).isPresent()) {
+            
hadoopConf.setRemoteUser(readonlyConfig.get(HdfsSourceConfigOptions.REMOTE_USER));
+        }
+
+        if 
(readonlyConfig.getOptional(HdfsSourceConfigOptions.KRB5_PATH).isPresent()) {
+            
hadoopConf.setKrb5Path(readonlyConfig.get(HdfsSourceConfigOptions.KRB5_PATH));
+        }
+
+        if 
(readonlyConfig.getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL).isPresent())
 {
+            hadoopConf.setKerberosPrincipal(
+                    
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL));
+        }
+
+        if 
(readonlyConfig.getOptional(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH).isPresent())
 {
+            hadoopConf.setKerberosKeytabPath(
+                    
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH));
+        }
+
+        return hadoopConf;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSourceConfig.java
similarity index 64%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSourceConfig.java
index 1e73a5541b..1b587a660b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSourceConfig.java
@@ -15,26 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 
-import com.google.auto.service.AutoService;
+public class HdfsFileSourceConfig extends BaseFileSourceConfig {
 
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseHdfsFileSource {
+    public HdfsFileSourceConfig(ReadonlyConfig readonlyConfig) {
+        super(readonlyConfig);
+    }
 
     @Override
-    public String getPluginName() {
-        return FileSystemType.HDFS.getFileSystemPluginName();
+    public HadoopConf getHadoopConfig() {
+        return HdfsFileHadoopConfig.buildWithConfig(getBaseFileSourceConfig());
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
+    public String getPluginName() {
+        return FileSystemType.HDFS.getFileSystemPluginName();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/MultipleTableHdfsFileSourceConfig.java
similarity index 54%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/MultipleTableHdfsFileSourceConfig.java
index 1e73a5541b..f68c3b7d49 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/MultipleTableHdfsFileSourceConfig.java
@@ -15,26 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseHdfsFileSource {
-
-    @Override
-    public String getPluginName() {
-        return FileSystemType.HDFS.getFileSystemPluginName();
+public class MultipleTableHdfsFileSourceConfig extends 
BaseMultipleTableFileSourceConfig {
+    public MultipleTableHdfsFileSourceConfig(ReadonlyConfig 
hdfsFileSourceConfig) {
+        super(hdfsFileSourceConfig);
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
+    public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig 
readonlyConfig) {
+        return new HdfsFileSourceConfig(readonlyConfig);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
index 1e73a5541b..1d9c01e62f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
@@ -17,24 +17,19 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.MultipleTableHdfsFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
 
-import com.google.auto.service.AutoService;
+public class HdfsFileSource extends BaseMultipleTableFileSource {
 
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseHdfsFileSource {
+    public HdfsFileSource(ReadonlyConfig readonlyConfig) {
+        super(new MultipleTableHdfsFileSourceConfig(readonlyConfig));
+    }
 
     @Override
     public String getPluginName() {
         return FileSystemType.HDFS.getFileSystemPluginName();
     }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 2a9d784a84..f7d343414a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -20,8 +20,11 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
@@ -29,10 +32,18 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSou
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
 import java.util.Arrays;
 
 @AutoService(Factory.class)
 public class HdfsFileSourceFactory implements TableSourceFactory {
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
HdfsFileSource(context.getOptions());
+    }
+
     @Override
     public String factoryIdentifier() {
         return FileSystemType.HDFS.getFileSystemPluginName();
@@ -41,9 +52,9 @@ public class HdfsFileSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(HdfsSourceConfigOptions.FILE_PATH)
-                .required(HdfsSourceConfigOptions.DEFAULT_FS)
-                .required(FileBaseSourceOptions.FILE_FORMAT_TYPE)
+                .exclusive(HdfsSourceConfigOptions.TABLE_CONFIGS, 
HdfsSourceConfigOptions.FILE_PATH)
+                .optional(HdfsSourceConfigOptions.DEFAULT_FS)
+                .optional(FileBaseSourceOptions.FILE_FORMAT_TYPE)
                 .conditional(
                         FileBaseSourceOptions.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
new file mode 100644
index 0000000000..024eaaebd8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.HdfsFileSourceFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.source.SourceFlowTestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Slf4j
+@DisabledOnOs(value = OS.WINDOWS)
+class HdfsFileSourceConfigTest {
+
+    public static final String DATA_FILE_PATH1 = 
"/tmp/seatunnel/data1.parquet";
+    public static final String DATA_FILE_PATH2 = 
"/tmp/seatunnel/data2.parquet";
+
+    private static final String DEFAULT_FS = "file:///";
+
+    @BeforeEach
+    public void init() throws IOException {
+        createParquetFile();
+    }
+
+    /** Test whether the Hadoop configuration and Catalog are generated 
correctly */
+    @Test
+    void testHadoopConfigAndCatalogTable() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HdfsSourceConfigOptions.FILE_PATH.key(), 
DATA_FILE_PATH1);
+        configMap.put(HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(), 
"parquet");
+        configMap.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+
+        Map<String, Object> schemaMap = new HashMap<>();
+        Map<String, Object> filedMap = new HashMap<>();
+        filedMap.put("id", "int");
+        filedMap.put("name", "string");
+        schemaMap.put("fields", filedMap);
+        configMap.put(HdfsSourceConfigOptions.SCHEMA.key(), schemaMap);
+
+        Config config = ConfigFactory.parseMap(configMap);
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
+
+        HdfsFileSourceConfig sourceConfig = new 
HdfsFileSourceConfig(readonlyConfig);
+        ReadStrategy readStrategy = sourceConfig.getReadStrategy();
+        CatalogTable catalogTable = sourceConfig.getCatalogTable();
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        HadoopConf hadoopConf = sourceConfig.getHadoopConfig();
+
+        Assertions.assertNotNull(hadoopConf);
+        Assertions.assertNotNull(catalogTable);
+        Assertions.assertNotNull(seaTunnelRowType);
+
+        // verify field names in seaTunnelRowType
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        assertEquals("id", fieldNames[0]);
+        assertEquals("name", fieldNames[1]);
+
+        // verify field types in seaTunnelRowType
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+        assertEquals(BasicType.INT_TYPE, fieldTypes[0]);
+        assertEquals(BasicType.STRING_TYPE, fieldTypes[1]);
+
+        Assertions.assertInstanceOf(ParquetReadStrategy.class, readStrategy);
+    }
+
+    /** Test multi-file reading based on the parquet file format */
+    @Test
+    public void parquetFileMultiSourceRead() throws Exception {
+        List<Map<String, Object>> tableConfigList = new ArrayList<>();
+
+        Map<String, Object> tableConfig1 = new HashMap<>();
+        // schema1
+        Map<String, Object> schema1 = new HashMap<>();
+        schema1.put("table", "db1.table1");
+
+        tableConfig1.put(HdfsSourceConfigOptions.SCHEMA.key(), schema1);
+        tableConfig1.put(HdfsSourceConfigOptions.FILE_PATH.key(), 
DATA_FILE_PATH1);
+        tableConfig1.put(HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(), 
"parquet");
+        tableConfig1.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+
+        Map<String, Object> tableConfig2 = new HashMap<>();
+        // schema2
+        Map<String, Object> schema2 = new HashMap<>();
+        schema2.put("table", "db2.table2");
+        tableConfig2.put(HdfsSourceConfigOptions.SCHEMA.key(), schema2);
+        tableConfig2.put(HdfsSourceConfigOptions.FILE_PATH.key(), 
DATA_FILE_PATH2);
+        tableConfig2.put(HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(), 
"parquet");
+        tableConfig2.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+
+        tableConfigList.add(tableConfig1);
+        tableConfigList.add(tableConfig2);
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(HdfsSourceConfigOptions.TABLE_CONFIGS.key(), 
tableConfigList);
+
+        // create parquet file
+        createParquetFile();
+
+        List<SeaTunnelRow> seaTunnelRows =
+                SourceFlowTestUtils.runBatchWithCheckpointDisabled(
+                        ReadonlyConfig.fromMap(configMap), new 
HdfsFileSourceFactory());
+
+        Assertions.assertEquals(4, seaTunnelRows.size());
+
+        Assertions.assertEquals("db1.table1", 
seaTunnelRows.get(0).getTableId());
+        Assertions.assertEquals("db1.table1", 
seaTunnelRows.get(1).getTableId());
+        Assertions.assertEquals("db2.table2", 
seaTunnelRows.get(2).getTableId());
+        Assertions.assertEquals("db2.table2", 
seaTunnelRows.get(3).getTableId());
+
+        Assertions.assertEquals(1, seaTunnelRows.get(0).getField(0));
+        Assertions.assertEquals("hdfs_multi_source_read1", 
seaTunnelRows.get(0).getField(1));
+        Assertions.assertEquals(2, seaTunnelRows.get(1).getField(0));
+        Assertions.assertEquals("hdfs_multi_source_read2", 
seaTunnelRows.get(1).getField(1));
+        Assertions.assertEquals(3, seaTunnelRows.get(2).getField(0));
+        Assertions.assertEquals("hdfs_multi_source_read3", 
seaTunnelRows.get(2).getField(1));
+        Assertions.assertEquals(4, seaTunnelRows.get(3).getField(0));
+        Assertions.assertEquals("hdfs_multi_source_read4", 
seaTunnelRows.get(3).getField(1));
+    }
+
+    @AfterEach
+    public void clear() throws IOException {
+        deleteFile(DATA_FILE_PATH1);
+        deleteFile(DATA_FILE_PATH2);
+    }
+
+    /** Create two parquet files for test */
+    private void createParquetFile() throws IOException {
+
+        // create avro schema
+        String schemaJson =
+                "{\"type\":\"record\",\"name\":\"test\",\"fields\":["
+                        + "{\"name\":\"id\",\"type\":\"int\"},"
+                        + "{\"name\":\"name\",\"type\":\"string\"}"
+                        + "]}";
+        Schema avroSchema = new Schema.Parser().parse(schemaJson);
+
+        // create first parquet file
+        Configuration conf1 = new Configuration();
+        Path path1 = new Path(DATA_FILE_PATH1);
+
+        try (ParquetWriter<GenericData.Record> writer =
+                AvroParquetWriter.<GenericData.Record>builder(path1)
+                        .withSchema(avroSchema)
+                        .withConf(conf1)
+                        .withCompressionCodec(CompressionCodecName.SNAPPY)
+                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                        .build()) {
+
+            // write first data
+            GenericData.Record record1 = new GenericData.Record(avroSchema);
+            record1.put("id", 1);
+            record1.put("name", "hdfs_multi_source_read1");
+            writer.write(record1);
+
+            // write second data
+            GenericData.Record record2 = new GenericData.Record(avroSchema);
+            record2.put("id", 2);
+            record2.put("name", "hdfs_multi_source_read2");
+            writer.write(record2);
+        }
+
+        // create second file
+        Configuration conf2 = new Configuration();
+        Path path2 = new Path(DATA_FILE_PATH2);
+
+        try (ParquetWriter<GenericData.Record> writer =
+                AvroParquetWriter.<GenericData.Record>builder(path2)
+                        .withSchema(avroSchema)
+                        .withConf(conf2)
+                        .withCompressionCodec(CompressionCodecName.SNAPPY)
+                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                        .build()) {
+
+            // write first data
+            GenericData.Record record1 = new GenericData.Record(avroSchema);
+            record1.put("id", 3);
+            record1.put("name", "hdfs_multi_source_read3");
+            writer.write(record1);
+
+            // write second data
+            GenericData.Record record2 = new GenericData.Record(avroSchema);
+            record2.put("id", 4);
+            record2.put("name", "hdfs_multi_source_read4");
+            writer.write(record2);
+        }
+    }
+
+    private void deleteFile(String path) throws IOException {
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.set("fs.defaultFS", "file:///");
+        FileSystem fileSystem = FileSystem.get(hadoopConf);
+
+        fileSystem.delete(new Path(path), true);
+    }
+}


Reply via email to