This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 672af255ef [Feature][Connector-V2] Support hdfs file multi table
source read (#9816)
672af255ef is described below
commit 672af255efcc239f5c3760d7bbc09a45e5676006
Author: JeremyXin <[email protected]>
AuthorDate: Mon Sep 8 18:35:39 2025 +0800
[Feature][Connector-V2] Support hdfs file multi table source read (#9816)
---
docs/en/connector-v2/source/HdfsFile.md | 41 ++++
docs/zh/connector-v2/source/HdfsFile.md | 41 ++++
.../HdfsFileCatalog.java} | 25 +--
.../file/hdfs/catalog/HdfsFileCatalogFactory.java | 53 +++++
.../file/hdfs/config/HdfsFileHadoopConfig.java | 78 +++++++
.../HdfsFileSourceConfig.java} | 24 +-
.../MultipleTableHdfsFileSourceConfig.java} | 25 +--
.../seatunnel/file/hdfs/source/HdfsFileSource.java | 19 +-
.../file/hdfs/source/HdfsFileSourceFactory.java | 17 +-
.../file/hdfs/HdfsFileSourceConfigTest.java | 248 +++++++++++++++++++++
10 files changed, 510 insertions(+), 61 deletions(-)
diff --git a/docs/en/connector-v2/source/HdfsFile.md
b/docs/en/connector-v2/source/HdfsFile.md
index fbf8632b47..ca3dfb65b6 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -25,6 +25,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+- [x] [support multiple table read](../../concept/connector-v2-features.md)
- [x] file format file
- [x] text
- [x] csv
@@ -261,6 +262,46 @@ sink {
}
```
+### Multiple Table
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ HdfsFile {
+ tables_configs = [
+ {
+ schema = {
+ table = "student"
+ }
+ path = "/apps/hive/demo/student"
+ file_format_type = "json"
+ fs.defaultFS = "hdfs://namenode001"
+ },
+ {
+ schema = {
+ table = "teacher"
+ }
+ path = "/apps/hive/demo/teacher"
+ file_format_type = "json"
+ fs.defaultFS = "hdfs://namenode001"
+ }
+ ]
+ }
+}
+
+sink {
+ HdfsFile {
+ fs.defaultFS = "hdfs://hadoopcluster"
+ path = "/tmp/hive/warehouse/${table_name}"
+ file_format_type = "orc"
+ }
+}
+
+```
+
## Changelog
<ChangeLog />
\ No newline at end of file
diff --git a/docs/zh/connector-v2/source/HdfsFile.md
b/docs/zh/connector-v2/source/HdfsFile.md
index 9518acadca..6b1ce1b73e 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -25,6 +25,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
- [x] [列投影](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
- [ ] [支持用户定义分片](../../concept/connector-v2-features.md)
+- [x] [支持多表读](../../concept/connector-v2-features.md)
- [x] 文件格式类型
- [x] text
- [x] csv
@@ -262,6 +263,46 @@ sink {
}
```
+### 多表配置
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ HdfsFile {
+ tables_configs = [
+ {
+ schema = {
+ table = "student"
+ }
+ path = "/apps/hive/demo/student"
+ file_format_type = "json"
+ fs.defaultFS = "hdfs://namenode001"
+ },
+ {
+ schema = {
+ table = "teacher"
+ }
+ path = "/apps/hive/demo/teacher"
+ file_format_type = "json"
+ fs.defaultFS = "hdfs://namenode001"
+ }
+ ]
+ }
+}
+
+sink {
+ HdfsFile {
+ fs.defaultFS = "hdfs://hadoopcluster"
+ path = "/tmp/hive/warehouse/${table_name}"
+ file_format_type = "orc"
+ }
+}
+
+```
+
## 变更日志
<ChangeLog />
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalog.java
similarity index 53%
copy from
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalog.java
index 1e73a5541b..d2fc65af78 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalog.java
@@ -15,26 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.catalog;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import
org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+public class HdfsFileCatalog extends AbstractFileCatalog {
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseHdfsFileSource {
-
- @Override
- public String getPluginName() {
- return FileSystemType.HDFS.getFileSystemPluginName();
- }
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
+ protected HdfsFileCatalog(
+ HadoopFileSystemProxy hadoopFileSystemProxy, String filePath,
String catalogName) {
+ super(hadoopFileSystemProxy, filePath, catalogName);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalogFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalogFactory.java
new file mode 100644
index 0000000000..c0fbb76f04
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/catalog/HdfsFileCatalogFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileHadoopConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class HdfsFileCatalogFactory implements CatalogFactory {
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ HadoopFileSystemProxy hadoopFileSystemProxy =
+ new
HadoopFileSystemProxy(HdfsFileHadoopConfig.buildWithConfig(options));
+ return new HdfsFileCatalog(
+ hadoopFileSystemProxy,
+ options.get(HdfsSourceConfigOptions.FILE_PATH),
+ factoryIdentifier());
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return FileSystemType.HDFS.getFileSystemPluginName();
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileHadoopConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileHadoopConfig.java
new file mode 100644
index 0000000000..23d325575f
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileHadoopConfig.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.config;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+
+public class HdfsFileHadoopConfig extends HadoopConf {
+ public HdfsFileHadoopConfig(String hdfsNameKey) {
+ super(hdfsNameKey);
+ }
+
+ public static HadoopConf buildWithConfig(ReadonlyConfig readonlyConfig) {
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(
+ readonlyConfig.toConfig(),
+ HdfsSourceConfigOptions.FILE_PATH.key(),
+ HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(),
+ HdfsSourceConfigOptions.DEFAULT_FS.key());
+ if (!result.isSuccess()) {
+ throw new FileConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ FileSystemType.HDFS.getFileSystemPluginName(),
+ PluginType.SOURCE,
+ result.getMsg()));
+ }
+ HadoopConf hadoopConf =
+ new
HdfsFileHadoopConfig(readonlyConfig.get(HdfsSourceConfigOptions.DEFAULT_FS));
+
+ if
(readonlyConfig.getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH).isPresent())
{
+
hadoopConf.setHdfsSitePath(readonlyConfig.get(HdfsSourceConfigOptions.HDFS_SITE_PATH));
+ }
+
+ if
(readonlyConfig.getOptional(HdfsSourceConfigOptions.REMOTE_USER).isPresent()) {
+
hadoopConf.setRemoteUser(readonlyConfig.get(HdfsSourceConfigOptions.REMOTE_USER));
+ }
+
+ if
(readonlyConfig.getOptional(HdfsSourceConfigOptions.KRB5_PATH).isPresent()) {
+
hadoopConf.setKrb5Path(readonlyConfig.get(HdfsSourceConfigOptions.KRB5_PATH));
+ }
+
+ if
(readonlyConfig.getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL).isPresent())
{
+ hadoopConf.setKerberosPrincipal(
+
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL));
+ }
+
+ if
(readonlyConfig.getOptional(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH).isPresent())
{
+ hadoopConf.setKerberosKeytabPath(
+
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH));
+ }
+
+ return hadoopConf;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSourceConfig.java
similarity index 64%
copy from
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSourceConfig.java
index 1e73a5541b..1b587a660b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/HdfsFileSourceConfig.java
@@ -15,26 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import com.google.auto.service.AutoService;
+public class HdfsFileSourceConfig extends BaseFileSourceConfig {
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseHdfsFileSource {
+ public HdfsFileSourceConfig(ReadonlyConfig readonlyConfig) {
+ super(readonlyConfig);
+ }
@Override
- public String getPluginName() {
- return FileSystemType.HDFS.getFileSystemPluginName();
+ public HadoopConf getHadoopConfig() {
+ return HdfsFileHadoopConfig.buildWithConfig(getBaseFileSourceConfig());
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
+ public String getPluginName() {
+ return FileSystemType.HDFS.getFileSystemPluginName();
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/MultipleTableHdfsFileSourceConfig.java
similarity index 54%
copy from
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
copy to
seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/MultipleTableHdfsFileSourceConfig.java
index 1e73a5541b..f68c3b7d49 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/config/MultipleTableHdfsFileSourceConfig.java
@@ -15,26 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseHdfsFileSource {
-
- @Override
- public String getPluginName() {
- return FileSystemType.HDFS.getFileSystemPluginName();
+public class MultipleTableHdfsFileSourceConfig extends
BaseMultipleTableFileSourceConfig {
+ public MultipleTableHdfsFileSourceConfig(ReadonlyConfig
hdfsFileSourceConfig) {
+ super(hdfsFileSourceConfig);
}
@Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
+ public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig
readonlyConfig) {
+ return new HdfsFileSourceConfig(readonlyConfig);
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
index 1e73a5541b..1d9c01e62f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
@@ -17,24 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.MultipleTableHdfsFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
-import com.google.auto.service.AutoService;
+public class HdfsFileSource extends BaseMultipleTableFileSource {
-@AutoService(SeaTunnelSource.class)
-public class HdfsFileSource extends BaseHdfsFileSource {
+ public HdfsFileSource(ReadonlyConfig readonlyConfig) {
+ super(new MultipleTableHdfsFileSourceConfig(readonlyConfig));
+ }
@Override
public String getPluginName() {
return FileSystemType.HDFS.getFileSystemPluginName();
}
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- super.prepare(pluginConfig);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 2a9d784a84..f7d343414a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -20,8 +20,11 @@ package
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
@@ -29,10 +32,18 @@ import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSou
import com.google.auto.service.AutoService;
+import java.io.Serializable;
import java.util.Arrays;
@AutoService(Factory.class)
public class HdfsFileSourceFactory implements TableSourceFactory {
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
HdfsFileSource(context.getOptions());
+ }
+
@Override
public String factoryIdentifier() {
return FileSystemType.HDFS.getFileSystemPluginName();
@@ -41,9 +52,9 @@ public class HdfsFileSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(HdfsSourceConfigOptions.FILE_PATH)
- .required(HdfsSourceConfigOptions.DEFAULT_FS)
- .required(FileBaseSourceOptions.FILE_FORMAT_TYPE)
+ .exclusive(HdfsSourceConfigOptions.TABLE_CONFIGS,
HdfsSourceConfigOptions.FILE_PATH)
+ .optional(HdfsSourceConfigOptions.DEFAULT_FS)
+ .optional(FileBaseSourceOptions.FILE_FORMAT_TYPE)
.conditional(
FileBaseSourceOptions.FILE_FORMAT_TYPE,
FileFormat.TEXT,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
new file mode 100644
index 0000000000..024eaaebd8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/HdfsFileSourceConfigTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.hdfs;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.config.HdfsFileSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.HdfsFileSourceFactory;
+import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import org.apache.seatunnel.connectors.seatunnel.source.SourceFlowTestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Slf4j
+@DisabledOnOs(value = OS.WINDOWS)
+class HdfsFileSourceConfigTest {
+
+ public static final String DATA_FILE_PATH1 =
"/tmp/seatunnel/data1.parquet";
+ public static final String DATA_FILE_PATH2 =
"/tmp/seatunnel/data2.parquet";
+
+ private static final String DEFAULT_FS = "file:///";
+
+ @BeforeEach
+ public void init() throws IOException {
+ createParquetFile();
+ }
+
+ /** Test whether the Hadoop configuration and Catalog are generated
correctly */
+ @Test
+ void testHadoopConfigAndCatalogTable() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HdfsSourceConfigOptions.FILE_PATH.key(),
DATA_FILE_PATH1);
+ configMap.put(HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(),
"parquet");
+ configMap.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+
+ Map<String, Object> schemaMap = new HashMap<>();
+ Map<String, Object> filedMap = new HashMap<>();
+ filedMap.put("id", "int");
+ filedMap.put("name", "string");
+ schemaMap.put("fields", filedMap);
+ configMap.put(HdfsSourceConfigOptions.SCHEMA.key(), schemaMap);
+
+ Config config = ConfigFactory.parseMap(configMap);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
+
+ HdfsFileSourceConfig sourceConfig = new
HdfsFileSourceConfig(readonlyConfig);
+ ReadStrategy readStrategy = sourceConfig.getReadStrategy();
+ CatalogTable catalogTable = sourceConfig.getCatalogTable();
+ SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ HadoopConf hadoopConf = sourceConfig.getHadoopConfig();
+
+ Assertions.assertNotNull(hadoopConf);
+ Assertions.assertNotNull(catalogTable);
+ Assertions.assertNotNull(seaTunnelRowType);
+
+ // verify field names in seaTunnelRowType
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ assertEquals("id", fieldNames[0]);
+ assertEquals("name", fieldNames[1]);
+
+ // verify field types in seaTunnelRowType
+ SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ assertEquals(BasicType.INT_TYPE, fieldTypes[0]);
+ assertEquals(BasicType.STRING_TYPE, fieldTypes[1]);
+
+ Assertions.assertInstanceOf(ParquetReadStrategy.class, readStrategy);
+ }
+
+ /** Test multi-file reading based on the parquet file format */
+ @Test
+ public void parquetFileMultiSourceRead() throws Exception {
+ List<Map<String, Object>> tableConfigList = new ArrayList<>();
+
+ Map<String, Object> tableConfig1 = new HashMap<>();
+ // schema1
+ Map<String, Object> schema1 = new HashMap<>();
+ schema1.put("table", "db1.table1");
+
+ tableConfig1.put(HdfsSourceConfigOptions.SCHEMA.key(), schema1);
+ tableConfig1.put(HdfsSourceConfigOptions.FILE_PATH.key(),
DATA_FILE_PATH1);
+ tableConfig1.put(HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(),
"parquet");
+ tableConfig1.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+
+ Map<String, Object> tableConfig2 = new HashMap<>();
+ // schema2
+ Map<String, Object> schema2 = new HashMap<>();
+ schema2.put("table", "db2.table2");
+ tableConfig2.put(HdfsSourceConfigOptions.SCHEMA.key(), schema2);
+ tableConfig2.put(HdfsSourceConfigOptions.FILE_PATH.key(),
DATA_FILE_PATH2);
+ tableConfig2.put(HdfsSourceConfigOptions.FILE_FORMAT_TYPE.key(),
"parquet");
+ tableConfig2.put(HdfsSourceConfigOptions.DEFAULT_FS.key(), DEFAULT_FS);
+
+ tableConfigList.add(tableConfig1);
+ tableConfigList.add(tableConfig2);
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HdfsSourceConfigOptions.TABLE_CONFIGS.key(),
tableConfigList);
+
+ // create parquet file
+ createParquetFile();
+
+ List<SeaTunnelRow> seaTunnelRows =
+ SourceFlowTestUtils.runBatchWithCheckpointDisabled(
+ ReadonlyConfig.fromMap(configMap), new
HdfsFileSourceFactory());
+
+ Assertions.assertEquals(4, seaTunnelRows.size());
+
+ Assertions.assertEquals("db1.table1",
seaTunnelRows.get(0).getTableId());
+ Assertions.assertEquals("db1.table1",
seaTunnelRows.get(1).getTableId());
+ Assertions.assertEquals("db2.table2",
seaTunnelRows.get(2).getTableId());
+ Assertions.assertEquals("db2.table2",
seaTunnelRows.get(3).getTableId());
+
+ Assertions.assertEquals(1, seaTunnelRows.get(0).getField(0));
+ Assertions.assertEquals("hdfs_multi_source_read1",
seaTunnelRows.get(0).getField(1));
+ Assertions.assertEquals(2, seaTunnelRows.get(1).getField(0));
+ Assertions.assertEquals("hdfs_multi_source_read2",
seaTunnelRows.get(1).getField(1));
+ Assertions.assertEquals(3, seaTunnelRows.get(2).getField(0));
+ Assertions.assertEquals("hdfs_multi_source_read3",
seaTunnelRows.get(2).getField(1));
+ Assertions.assertEquals(4, seaTunnelRows.get(3).getField(0));
+ Assertions.assertEquals("hdfs_multi_source_read4",
seaTunnelRows.get(3).getField(1));
+ }
+
+ @AfterEach
+ public void clear() throws IOException {
+ deleteFile(DATA_FILE_PATH1);
+ deleteFile(DATA_FILE_PATH2);
+ }
+
+ /** Create two parquet files for test */
+ private void createParquetFile() throws IOException {
+
+ // create avro schema
+ String schemaJson =
+ "{\"type\":\"record\",\"name\":\"test\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ + "{\"name\":\"name\",\"type\":\"string\"}"
+ + "]}";
+ Schema avroSchema = new Schema.Parser().parse(schemaJson);
+
+ // create first parquet file
+ Configuration conf1 = new Configuration();
+ Path path1 = new Path(DATA_FILE_PATH1);
+
+ try (ParquetWriter<GenericData.Record> writer =
+ AvroParquetWriter.<GenericData.Record>builder(path1)
+ .withSchema(avroSchema)
+ .withConf(conf1)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()) {
+
+ // write first data
+ GenericData.Record record1 = new GenericData.Record(avroSchema);
+ record1.put("id", 1);
+ record1.put("name", "hdfs_multi_source_read1");
+ writer.write(record1);
+
+ // write second data
+ GenericData.Record record2 = new GenericData.Record(avroSchema);
+ record2.put("id", 2);
+ record2.put("name", "hdfs_multi_source_read2");
+ writer.write(record2);
+ }
+
+ // create second file
+ Configuration conf2 = new Configuration();
+ Path path2 = new Path(DATA_FILE_PATH2);
+
+ try (ParquetWriter<GenericData.Record> writer =
+ AvroParquetWriter.<GenericData.Record>builder(path2)
+ .withSchema(avroSchema)
+ .withConf(conf2)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build()) {
+
+ // write first data
+ GenericData.Record record1 = new GenericData.Record(avroSchema);
+ record1.put("id", 3);
+ record1.put("name", "hdfs_multi_source_read3");
+ writer.write(record1);
+
+ // write second data
+ GenericData.Record record2 = new GenericData.Record(avroSchema);
+ record2.put("id", 4);
+ record2.put("name", "hdfs_multi_source_read4");
+ writer.write(record2);
+ }
+ }
+
+ private void deleteFile(String path) throws IOException {
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set("fs.defaultFS", "file:///");
+ FileSystem fileSystem = FileSystem.get(hadoopConf);
+
+ fileSystem.delete(new Path(path), true);
+ }
+}