This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 22fe27a3d6 [Feature][Connector-V2]Ftp file source support multiple 
table (#7795)
22fe27a3d6 is described below

commit 22fe27a3d69e92c7697af6f62dedd903c259f12a
Author: 老王 <58297137+chl-...@users.noreply.github.com>
AuthorDate: Tue Oct 8 21:18:45 2024 +0800

    [Feature][Connector-V2]Ftp file source support multiple table (#7795)
---
 docs/en/connector-v2/source/FtpFile.md             |  61 +++++++++
 .../file/ftp/config/FTPFileSourceConfig.java       |  45 +++++++
 .../config/MultipleTableFTPFileSourceConfig.java   |  34 +++++
 .../seatunnel/file/ftp/source/FtpFileSource.java   | 115 +----------------
 .../file/ftp/source/FtpFileSourceFactory.java      |  22 +++-
 .../e2e/connector/file/ftp/FtpFileIT.java          |  36 +++++-
 ...ftp_file_json_to_assert_with_multipletable.conf | 140 +++++++++++++++++++++
 7 files changed, 334 insertions(+), 119 deletions(-)

diff --git a/docs/en/connector-v2/source/FtpFile.md 
b/docs/en/connector-v2/source/FtpFile.md
index 656f7a0042..ec02f77f9f 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -306,6 +306,67 @@ Source plugin common parameters, please refer to [Source 
Common Options](../sour
 
 ```
 
+### Multiple Table
+
+```hocon
+
+FtpFile {
+  tables_configs = [
+    {
+      schema {
+        table = "student"
+      }
+      path = "/tmp/seatunnel/sink/text"
+      host = "192.168.31.48"
+      port = 21
+      user = tyrantlucifer
+      password = tianchao
+      file_format_type = "parquet"
+    },
+    {
+      schema {
+        table = "teacher"
+      }
+      path = "/tmp/seatunnel/sink/text"
+      host = "192.168.31.48"
+      port = 21
+      user = tyrantlucifer
+      password = tianchao
+      file_format_type = "parquet"
+    }
+  ]
+}
+
+```
+
+```hocon
+
+FtpFile {
+  tables_configs = [
+    {
+      schema {
+        fields {
+          name = string
+          age = int
+        }
+      }
+      path = "/apps/hive/demo/student"
+      file_format_type = "json"
+    },
+    {
+      schema {
+        fields {
+          name = string
+          age = int
+        }
+      }
+      path = "/apps/hive/demo/teacher"
+      file_format_type = "json"
+    }
+}
+
+```
+
 ### Transfer Binary File
 
 ```hocon
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FTPFileSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FTPFileSourceConfig.java
new file mode 100644
index 0000000000..8677ed29d4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FTPFileSourceConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import lombok.Getter;
+
+@Getter
+public class FTPFileSourceConfig extends BaseFileSourceConfig {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public HadoopConf getHadoopConfig() {
+        return FtpConf.buildWithConfig(getBaseFileSourceConfig());
+    }
+
+    @Override
+    public String getPluginName() {
+        return FileSystemType.FTP.getFileSystemPluginName();
+    }
+
+    public FTPFileSourceConfig(ReadonlyConfig readonlyConfig) {
+        super(readonlyConfig);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/MultipleTableFTPFileSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/MultipleTableFTPFileSourceConfig.java
new file mode 100644
index 0000000000..78a04c648e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/MultipleTableFTPFileSourceConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
+
+public class MultipleTableFTPFileSourceConfig extends 
BaseMultipleTableFileSourceConfig {
+
+    public MultipleTableFTPFileSourceConfig(ReadonlyConfig 
ossFileSourceRootConfig) {
+        super(ossFileSourceRootConfig);
+    }
+
+    @Override
+    public BaseFileSourceConfig getBaseSourceConfig(ReadonlyConfig 
readonlyConfig) {
+        return new FTPFileSourceConfig(readonlyConfig);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index d6f0f64abb..b8e798ba0a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -17,121 +17,18 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
-import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
-
-import com.google.auto.service.AutoService;
+import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.MultipleTableFTPFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.BaseMultipleTableFileSource;
 
-import java.io.IOException;
+public class FtpFileSource extends BaseMultipleTableFileSource {
+    public FtpFileSource(ReadonlyConfig readonlyConfig) {
+        super(new MultipleTableFTPFileSourceConfig(readonlyConfig));
+    }
 
-@AutoService(SeaTunnelSource.class)
-public class FtpFileSource extends BaseFileSource {
     @Override
     public String getPluginName() {
         return FileSystemType.FTP.getFileSystemPluginName();
     }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        FtpConfigOptions.FILE_PATH.key(),
-                        FtpConfigOptions.FILE_FORMAT_TYPE.key(),
-                        FtpConfigOptions.FTP_HOST.key(),
-                        FtpConfigOptions.FTP_PORT.key(),
-                        FtpConfigOptions.FTP_USERNAME.key(),
-                        FtpConfigOptions.FTP_PASSWORD.key());
-        if (!result.isSuccess()) {
-            throw new FileConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        FileFormat fileFormat =
-                FileFormat.valueOf(
-                        pluginConfig
-                                
.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key())
-                                .toUpperCase());
-        if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
-            throw new FileConnectorException(
-                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                    "Ftp file source connector only support read [text, csv, 
json] files");
-        }
-        String path = pluginConfig.getString(FtpConfigOptions.FILE_PATH.key());
-        hadoopConf = 
FtpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
-        readStrategy =
-                ReadStrategyFactory.of(
-                        
pluginConfig.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key()));
-        readStrategy.setPluginConfig(pluginConfig);
-        readStrategy.init(hadoopConf);
-        try {
-            filePaths = readStrategy.getFileNamesByPath(path);
-        } catch (IOException e) {
-            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
-            throw new FileConnectorException(
-                    FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
-        }
-        // support user-defined schema
-        // only json type support user-defined schema now
-        if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            switch (fileFormat) {
-                case CSV:
-                case TEXT:
-                case JSON:
-                case EXCEL:
-                case XML:
-                    SeaTunnelRowType userDefinedSchema =
-                            
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
-                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
-                    break;
-                case ORC:
-                case PARQUET:
-                case BINARY:
-                    throw new FileConnectorException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
-                            "SeaTunnel does not support user-defined schema 
for [parquet, orc, binary] files");
-                default:
-                    // never got in there
-                    throw new FileConnectorException(
-                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                            "SeaTunnel does not supported this file format");
-            }
-        } else {
-            if (filePaths.isEmpty()) {
-                // When the directory is empty, distribute default behavior 
schema
-                rowType = CatalogTableUtil.buildSimpleTextSchema();
-                return;
-            }
-            try {
-                rowType = 
readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
-            } catch (FileConnectorException e) {
-                String errorMsg =
-                        String.format("Get table schema from file [%s] 
failed", filePaths.get(0));
-                throw new FileConnectorException(
-                        CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, 
errorMsg, e);
-            }
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 112cccc3af..9bb4b98e05 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -19,9 +19,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
@@ -29,6 +32,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOption
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
 import java.util.Arrays;
 
 @AutoService(Factory.class)
@@ -38,15 +42,21 @@ public class FtpFileSourceFactory implements 
TableSourceFactory {
         return FileSystemType.FTP.getFileSystemPluginName();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
FtpFileSource(context.getOptions());
+    }
+
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(FtpConfigOptions.FILE_PATH)
-                .required(FtpConfigOptions.FTP_HOST)
-                .required(FtpConfigOptions.FTP_PORT)
-                .required(FtpConfigOptions.FTP_USERNAME)
-                .required(FtpConfigOptions.FTP_PASSWORD)
-                .required(FtpConfigOptions.FILE_FORMAT_TYPE)
+                .optional(FtpConfigOptions.FILE_PATH)
+                .optional(FtpConfigOptions.FTP_HOST)
+                .optional(FtpConfigOptions.FTP_PORT)
+                .optional(FtpConfigOptions.FTP_USERNAME)
+                .optional(FtpConfigOptions.FTP_PASSWORD)
+                .optional(FtpConfigOptions.FILE_FORMAT_TYPE)
                 .conditional(
                         BaseSourceConfigOptions.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 165d48e631..70b2463ea8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -158,13 +158,19 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         helper.execute("/orc/fake_to_ftp_file_orc.conf");
         // test write ftp root path excel file
         helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
+        // test ftp source support multipleTable
+
+        String homePath = "/home/vsftpd/seatunnel";
+        String sink01 = "/tmp/seatunnel/json/sink/multiplesource/fake01";
+        String sink02 = "/tmp/seatunnel/json/sink/multiplesource/fake02";
+        deleteFileFromContainer(homePath + sink01);
+        deleteFileFromContainer(homePath + sink02);
+        
helper.execute("/json/ftp_file_json_to_assert_with_multipletable.conf");
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
sink01).size(), 1);
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
sink02).size(), 1);
     }
 
     @TestTemplate
-    @DisabledOnContainer(
-            value = {},
-            type = {EngineType.FLINK},
-            disabledReason = "Flink dosen't support multi-table at now")
     public void testMultipleTableAndSaveMode(TestContainer container)
             throws IOException, InterruptedException {
         TestHelper helper = new TestHelper(container);
@@ -172,6 +178,8 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         String homePath = "/home/vsftpd/seatunnel";
         String path1 = "/tmp/seatunnel_mult/text/source_1";
         String path2 = "/tmp/seatunnel_mult/text/source_2";
+        deleteFileFromContainer(homePath + path1);
+        deleteFileFromContainer(homePath + path2);
         Assertions.assertEquals(getFileListFromContainer(homePath + 
path1).size(), 0);
         Assertions.assertEquals(getFileListFromContainer(homePath + 
path2).size(), 0);
         helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf");
@@ -183,6 +191,8 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         // test mult table and save_mode:CREATE_SCHEMA_WHEN_NOT_EXIST 
APPEND_DATA
         String path3 = "/tmp/seatunnel_mult2/text/source_1";
         String path4 = "/tmp/seatunnel_mult2/text/source_2";
+        deleteFileFromContainer(homePath + path3);
+        deleteFileFromContainer(homePath + path4);
         Assertions.assertEquals(getFileListFromContainer(homePath + 
path3).size(), 0);
         Assertions.assertEquals(getFileListFromContainer(homePath + 
path4).size(), 0);
         helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf");
@@ -223,6 +233,24 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         return fileList;
     }
 
+    @SneakyThrows
+    private void deleteFileFromContainer(String path) {
+        String command = "rm -rf " + path;
+        ExecCreateCmdResponse execCreateCmdResponse =
+                dockerClient
+                        .execCreateCmd(ftpContainer.getContainerId())
+                        .withCmd("sh", "-c", command)
+                        .withAttachStdout(true)
+                        .withAttachStderr(true)
+                        .exec();
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        dockerClient
+                .execStartCmd(execCreateCmdResponse.getId())
+                .exec(new ExecStartResultCallback(outputStream, System.err))
+                .awaitCompletion();
+    }
+
     @AfterAll
     @Override
     public void tearDown() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_file_json_to_assert_with_multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_file_json_to_assert_with_multipletable.conf
new file mode 100644
index 0000000000..8fd613255c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/json/ftp_file_json_to_assert_with_multipletable.conf
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FtpFile {
+    tables_configs = [
+      {
+          host = "ftp"
+          port = 21
+          user = seatunnel
+          password = pass
+          path = "/tmp/seatunnel/read/json"
+          file_format_type = "json"
+          schema = {
+            table = "fake01"
+            fields {
+              c_map = "map<string, string>"
+              c_array = "array<int>"
+              c_string = string
+              c_boolean = boolean
+              c_tinyint = tinyint
+              c_smallint = smallint
+              c_int = int
+              c_bigint = bigint
+              c_float = float
+              c_double = double
+              c_bytes = bytes
+              c_date = date
+              c_decimal = "decimal(38, 18)"
+              c_timestamp = timestamp
+              c_row = {
+                C_MAP = "map<string, string>"
+                C_ARRAY = "array<int>"
+                C_STRING = string
+                C_BOOLEAN = boolean
+                C_TINYINT = tinyint
+                C_SMALLINT = smallint
+                C_INT = int
+                C_BIGINT = bigint
+                C_FLOAT = float
+                C_DOUBLE = double
+                C_BYTES = bytes
+                C_DATE = date
+                C_DECIMAL = "decimal(38, 18)"
+                C_TIMESTAMP = timestamp
+              }
+            }
+          }
+      },
+      {
+          host = "ftp"
+          port = 21
+          user = seatunnel
+          password = pass
+          path = "/tmp/seatunnel/read/json"
+          file_format_type = "json"
+          schema = {
+            table = "fake02"
+            fields {
+              c_map = "map<string, string>"
+              c_array = "array<int>"
+              c_string = string
+              c_boolean = boolean
+              c_tinyint = tinyint
+              c_smallint = smallint
+              c_int = int
+              c_bigint = bigint
+              c_float = float
+              c_double = double
+              c_bytes = bytes
+              c_date = date
+              c_decimal = "decimal(38, 18)"
+              c_timestamp = timestamp
+              c_row = {
+                C_MAP = "map<string, string>"
+                C_ARRAY = "array<int>"
+                C_STRING = string
+                C_BOOLEAN = boolean
+                C_TINYINT = tinyint
+                C_SMALLINT = smallint
+                C_INT = int
+                C_BIGINT = bigint
+                C_FLOAT = float
+                C_DOUBLE = double
+                C_BYTES = bytes
+                C_DATE = date
+                C_DECIMAL = "decimal(38, 18)"
+                C_TIMESTAMP = timestamp
+              }
+            }
+          }
+      }
+    ]
+    result_table_name = "ftp"
+  }
+}
+
+sink {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+    path = "/tmp/seatunnel/json/sink/multiplesource/${table_name}"
+    source_table_name = "ftp"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "json"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file

Reply via email to