This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4f812e12ae [Feature][Connector-V2] Ftp file sink suport multiple table 
and save mode (#7665)
4f812e12ae is described below

commit 4f812e12aeb20c301bdc5bc5fc93b47b6b2cee50
Author: 老王 <58297137+chl-...@users.noreply.github.com>
AuthorDate: Sat Sep 14 21:42:45 2024 +0800

    [Feature][Connector-V2] Ftp file sink suport multiple table and save mode 
(#7665)
---
 docs/en/connector-v2/sink/FtpFile.md               |  42 +++++++++
 .../seatunnel/file/ftp/catalog/FtpFileCatalog.java |  29 ++++++
 .../file/ftp/catalog/FtpFileCatalogFactory.java    |  53 +++++++++++
 .../seatunnel/file/ftp/config/FtpConf.java         |  25 ++---
 .../seatunnel/file/ftp/sink/FtpFileSink.java       |  39 ++------
 .../file/ftp/sink/FtpFileSinkFactory.java          |  25 ++++-
 .../seatunnel/file/ftp/source/FtpFileSource.java   |   3 +-
 .../e2e/connector/file/ftp/FtpFileIT.java          |  73 ++++++++++++++
 .../text/multiple_table_fake_to_ftp_file_text.conf | 105 +++++++++++++++++++++
 .../multiple_table_fake_to_ftp_file_text_2.conf    | 105 +++++++++++++++++++++
 10 files changed, 451 insertions(+), 48 deletions(-)

diff --git a/docs/en/connector-v2/sink/FtpFile.md 
b/docs/en/connector-v2/sink/FtpFile.md
index 9305aa7e99..5b927bda12 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -64,6 +64,8 @@ By default, we use 2PC commit to ensure `exactly-once`
 | parquet_avro_write_timestamp_as_int96 | boolean | no       | false           
                           | Only used when file_format is parquet.             
                                                               |
 | parquet_avro_write_fixed_as_int96     | array   | no       | -               
                           | Only used when file_format is parquet.             
                                                               |
 | encoding                              | string  | no       | "UTF-8"         
                           | Only used when file_format_type is 
json,text,csv,xml.                                                             |
+| schema_save_mode                      | string  | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST               | Existing dir processing method     
                                                               |
+| data_save_mode                        | string  | no       | APPEND_DATA     
                           | Existing data processing method                    
                                               |
 
 ### host [string]
 
@@ -227,6 +229,18 @@ Support writing Parquet INT96 from a 12-byte field, only 
valid for parquet files
 Only used when file_format_type is json,text,csv,xml.
 The encoding of the file to write. This param will be parsed by 
`Charset.forName(encoding)`.
 
+### schema_save_mode [string]
+Existing dir processing method.
+- RECREATE_SCHEMA: will create when the dir does not exist, delete and 
recreate when the dir is exist
+- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, 
skipped when the dir is exist
+- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not 
exist
+- IGNORE :Ignore the treatment of the table
+
+### data_save_mode [string]
+Existing data processing method.
+- DROP_DATA: preserve dir and delete data files
+- APPEND_DATA: preserve dir, preserve data files
+- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported
 ## Example
 
 For text file format simple config
@@ -273,6 +287,34 @@ FtpFile {
 
 ```
 
+When our source end is multiple tables, and wants different expressions to 
different directory, we can configure this way
+
+```hocon
+
+FtpFile {
+    host = "xxx.xxx.xxx.xxx"
+    port = 21
+    user = "username"
+    password = "password"
+    path = "/data/ftp/seatunnel/job1/${table_name}"
+    tmp_path = "/data/ftp/seatunnel/tmp"
+    file_format_type = "text"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    have_partition = true
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    custom_filename = true
+    file_name_expression = "${transactionId}_${now}"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    schema_save_mode=RECREATE_SCHEMA
+    data_save_mode=DROP_DATA
+}
+
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java
new file mode 100644
index 0000000000..2bf0bf49e5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog;
+
+import 
org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+
+public class FtpFileCatalog extends AbstractFileCatalog {
+
+    public FtpFileCatalog(
+            HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, 
String catalogName) {
+        super(hadoopFileSystemProxy, filePath, catalogName);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java
new file mode 100644
index 0000000000..74f05c12d7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class FtpFileCatalogFactory implements CatalogFactory {
+    @Override
+    public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+        HadoopFileSystemProxy fileSystemUtils =
+                new HadoopFileSystemProxy(FtpConf.buildWithConfig(options));
+        return new FtpFileCatalog(
+                fileSystemUtils,
+                options.get(BaseSourceConfigOptions.FILE_PATH),
+                FileSystemType.FTP.getFileSystemPluginName());
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FileSystemType.FTP.getFileSystemPluginName();
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
index 9186e1d8ee..bd98800c54 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
@@ -17,18 +17,19 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.ftp.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;
 
 import java.util.HashMap;
+import java.util.Optional;
 
 public class FtpConf extends HadoopConf {
     private static final String HDFS_IMPL =
             
"org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem";
     private static final String SCHEMA = "ftp";
 
-    private FtpConf(String hdfsNameKey) {
+    public FtpConf(String hdfsNameKey) {
         super(hdfsNameKey);
     }
 
@@ -42,20 +43,20 @@ public class FtpConf extends HadoopConf {
         return SCHEMA;
     }
 
-    public static HadoopConf buildWithConfig(Config config) {
-        String host = config.getString(FtpConfigOptions.FTP_HOST.key());
-        int port = config.getInt(FtpConfigOptions.FTP_PORT.key());
+    public static HadoopConf buildWithConfig(ReadonlyConfig config) {
+        String host = config.get(FtpConfigOptions.FTP_HOST);
+        int port = config.get(FtpConfigOptions.FTP_PORT);
         String defaultFS = String.format("ftp://%s:%s";, host, port);
         HadoopConf hadoopConf = new FtpConf(defaultFS);
         HashMap<String, String> ftpOptions = new HashMap<>();
-        ftpOptions.put(
-                "fs.ftp.user." + host, 
config.getString(FtpConfigOptions.FTP_USERNAME.key()));
-        ftpOptions.put(
-                "fs.ftp.password." + host, 
config.getString(FtpConfigOptions.FTP_PASSWORD.key()));
-        if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) {
+        ftpOptions.put("fs.ftp.user." + host, 
config.get(FtpConfigOptions.FTP_USERNAME));
+        ftpOptions.put("fs.ftp.password." + host, 
config.get(FtpConfigOptions.FTP_PASSWORD));
+        Optional<FtpConnectionMode> optional =
+                config.getOptional(FtpConfigOptions.FTP_CONNECTION_MODE);
+        if (optional.isPresent()) {
             ftpOptions.put(
                     "fs.ftp.connection.mode",
-                    
config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key()));
+                    
config.get(FtpConfigOptions.FTP_CONNECTION_MODE).toString());
         }
         hadoopConf.setExtraOptions(ftpOptions);
         return hadoopConf;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
index 031d442f20..f4b271e035 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
@@ -17,46 +17,19 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
-
-import com.google.auto.service.AutoService;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink;
 
-@AutoService(SeaTunnelSink.class)
-public class FtpFileSink extends BaseFileSink {
+public class FtpFileSink extends BaseMultipleTableFileSink {
     @Override
     public String getPluginName() {
         return FileSystemType.FTP.getFileSystemPluginName();
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        FtpConfigOptions.FTP_HOST.key(),
-                        FtpConfigOptions.FTP_PORT.key(),
-                        FtpConfigOptions.FTP_USERNAME.key(),
-                        FtpConfigOptions.FTP_PASSWORD.key());
-        if (!result.isSuccess()) {
-            throw new FileConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        super.prepare(pluginConfig);
-        hadoopConf = FtpConf.buildWithConfig(pluginConfig);
+    public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
+        super(FtpConf.buildWithConfig(readonlyConfig), readonlyConfig, 
catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index 24a9ed48f8..cfd2351a5c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -17,18 +17,27 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkCommonOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
-public class FtpFileSinkFactory implements TableSinkFactory {
+public class FtpFileSinkFactory extends BaseMultipleTableFileSinkFactory {
     @Override
     public String factoryIdentifier() {
         return FileSystemType.FTP.getFileSystemPluginName();
@@ -42,7 +51,11 @@ public class FtpFileSinkFactory implements TableSinkFactory {
                 .required(FtpConfigOptions.FTP_PORT)
                 .required(FtpConfigOptions.FTP_USERNAME)
                 .required(FtpConfigOptions.FTP_PASSWORD)
+                .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
+                .optional(BaseSinkConfig.TMP_PATH)
                 .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+                .optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
+                .optional(BaseSinkConfig.DATA_SAVE_MODE)
                 .conditional(
                         BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
@@ -94,4 +107,12 @@ public class FtpFileSinkFactory implements TableSinkFactory 
{
                 .optional(FtpConfigOptions.FTP_CONNECTION_MODE)
                 .build();
     }
+
+    @Override
+    public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>
+            createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new FtpFileSink(readonlyConfig, catalogTable);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index b032717cab..d6f0f64abb 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
@@ -78,7 +79,7 @@ public class FtpFileSource extends BaseFileSource {
                     "Ftp file source connector only support read [text, csv, 
json] files");
         }
         String path = pluginConfig.getString(FtpConfigOptions.FILE_PATH.key());
-        hadoopConf = FtpConf.buildWithConfig(pluginConfig);
+        hadoopConf = 
FtpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig));
         readStrategy =
                 ReadStrategyFactory.of(
                         
pluginConfig.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key()));
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 2a1598bf32..1b89a0bcc7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -25,17 +25,27 @@ import org.apache.seatunnel.e2e.common.container.TestHelper;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import 
org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback;
 
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.stream.Stream;
 
 @DisabledOnContainer(
@@ -143,6 +153,69 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.FLINK},
+            disabledReason = "Flink dosen't support multi-table at now")
+    public void testMultipleTableAndSaveMode(TestContainer container)
+            throws IOException, InterruptedException {
+        TestHelper helper = new TestHelper(container);
+        // test mult table and save_mode:RECREATE_SCHEMA DROP_DATA
+        String homePath = "/home/vsftpd/seatunnel";
+        String path1 = "/tmp/seatunnel_mult/text/source_1";
+        String path2 = "/tmp/seatunnel_mult/text/source_2";
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path1).size(), 0);
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path2).size(), 0);
+        helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf");
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path1).size(), 1);
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path2).size(), 1);
+        helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf");
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path1).size(), 1);
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path2).size(), 1);
+        // test mult table and save_mode:CREATE_SCHEMA_WHEN_NOT_EXIST 
APPEND_DATA
+        String path3 = "/tmp/seatunnel_mult2/text/source_1";
+        String path4 = "/tmp/seatunnel_mult2/text/source_2";
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path3).size(), 0);
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path4).size(), 0);
+        helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf");
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path3).size(), 1);
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path4).size(), 1);
+        helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf");
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path3).size(), 2);
+        Assertions.assertEquals(getFileListFromContainer(homePath + 
path4).size(), 2);
+    }
+
+    @SneakyThrows
+    private List<String> getFileListFromContainer(String path) {
+        String command = "ls -1 " + path;
+        ExecCreateCmdResponse execCreateCmdResponse =
+                dockerClient
+                        .execCreateCmd(ftpContainer.getContainerId())
+                        .withCmd("sh", "-c", command)
+                        .withAttachStdout(true)
+                        .withAttachStderr(true)
+                        .exec();
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        dockerClient
+                .execStartCmd(execCreateCmdResponse.getId())
+                .exec(new ExecStartResultCallback(outputStream, System.err))
+                .awaitCompletion();
+
+        String output = new String(outputStream.toByteArray(), 
StandardCharsets.UTF_8).trim();
+        List<String> fileList = new ArrayList<>();
+        log.info("container path file list is :{}", output);
+        String[] files = output.split("\n");
+        for (String file : files) {
+            if (StringUtils.isNotEmpty(file)) {
+                log.info("container path file name is :{}", file);
+                fileList.add(file);
+            }
+        }
+        return fileList;
+    }
+
     @AfterAll
     @Override
     public void tearDown() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf
new file mode 100644
index 0000000000..cd28e54399
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    result_table_name = "ftp"
+    tables_configs = [
+       {
+        schema = {
+          table = "source_1"
+         fields {
+                id = int
+                val_bool = boolean
+                val_tinyint = tinyint
+                val_smallint = smallint
+                val_int = int
+                val_bigint = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+      }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "source_2"
+              fields {
+                id = int
+                val_bool = boolean
+                val_tinyint = tinyint
+                val_smallint = smallint
+                val_int = int
+                val_bigint = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+    path = "/tmp/seatunnel_mult/text/${table_name}"
+    source_table_name = "ftp"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    compress_codec = "lzo"
+    "schema_save_mode"="RECREATE_SCHEMA"
+    "data_save_mode"="DROP_DATA"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf
new file mode 100644
index 0000000000..e05a14ab86
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    result_table_name = "ftp"
+    tables_configs = [
+       {
+        schema = {
+          table = "source_1"
+         fields {
+                id = int
+                val_bool = boolean
+                val_tinyint = tinyint
+                val_smallint = smallint
+                val_int = int
+                val_bigint = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+                val_string = string
+      }
+        }
+            rows = [
+              {
+                kind = INSERT
+                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"]
+              }
+              ]
+       },
+       {
+       schema = {
+         table = "source_2"
+              fields {
+                id = int
+                val_bool = boolean
+                val_tinyint = tinyint
+                val_smallint = smallint
+                val_int = int
+                val_bigint = bigint
+                val_float = float
+                val_double = double
+                val_decimal = "decimal(16, 1)"
+              }
+       }
+           rows = [
+             {
+               kind = INSERT
+               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3]
+             }
+             ]
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+    path = "/tmp/seatunnel_mult2/text/${table_name}"
+    source_table_name = "ftp"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    compress_codec = "lzo"
+    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+    "data_save_mode"="APPEND_DATA"
+  }
+}
\ No newline at end of file

Reply via email to