This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f6bcc4d59d [Feature][Connectors-v2-file-ftp] FTP source/sink add ftp 
connection mode (#6077)  (#6099)
f6bcc4d59d is described below

commit f6bcc4d59d91e6c47f0726fa6051b6c324ae5c2e
Author: mingbei.xu <41814775+williamtan...@users.noreply.github.com>
AuthorDate: Tue Jan 2 18:01:36 2024 +0800

    [Feature][Connectors-v2-file-ftp] FTP source/sink add ftp connection mode 
(#6077)  (#6099)
---
 docs/en/connector-v2/sink/FtpFile.md               |  7 ++
 docs/en/connector-v2/source/FtpFile.md             |  7 ++
 .../seatunnel/file/config/BaseFileSinkConfig.java  |  5 ++
 .../seatunnel/file/ftp/config/FtpConf.java         |  5 ++
 .../file/ftp/config/FtpConfigOptions.java          |  8 ++
 .../file/ftp/sink/FtpFileSinkFactory.java          |  1 +
 .../file/ftp/source/FtpFileSourceFactory.java      |  1 +
 .../file/ftp/system/FtpConnectionMode.java         | 47 ++++++++++++
 .../file/ftp/system/SeaTunnelFTPFileSystem.java    | 27 +++++++
 .../e2e/connector/file/ftp/FtpFileIT.java          |  5 ++
 .../excel/fake_source_to_ftp_root_path_excel.conf  | 85 ++++++++++++++++++++++
 11 files changed, 198 insertions(+)

diff --git a/docs/en/connector-v2/sink/FtpFile.md 
b/docs/en/connector-v2/sink/FtpFile.md
index ab55b6e4da..3233fc3c6d 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -38,6 +38,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | password                         | string  | yes      | -                    
                      |                                                         
                                                          |
 | path                             | string  | yes      | -                    
                      |                                                         
                                                          |
 | tmp_path                         | string  | yes      | /tmp/seatunnel       
                      | The result file will write to a tmp path first and then 
use `mv` to submit tmp dir to target dir. Need a FTP dir. |
+| connection_mode                  | string  | no       | active_local         
                      | The target ftp connection mode                          
                                                          |
 | custom_filename                  | boolean | no       | false                
                      | Whether you need custom the filename                    
                                                          |
 | file_name_expression             | string  | no       | "${transactionId}"   
                      | Only used when custom_filename is true                  
                                                          |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"         
                      | Only used when custom_filename is true                  
                                                          |
@@ -76,6 +77,12 @@ The target ftp password is required
 
 The target dir path is required.
 
+### connection_mode [string]
+
+The target ftp connection mode , default is active mode, supported as the 
following modes:
+
+`active_local` `passive_local`
+
 ### custom_filename [boolean]
 
 Whether custom the filename
diff --git a/docs/en/connector-v2/source/FtpFile.md 
b/docs/en/connector-v2/source/FtpFile.md
index 781d7d40bc..ee231bb087 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -44,6 +44,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | password                  | string  | yes      | -                   |
 | path                      | string  | yes      | -                   |
 | file_format_type          | string  | yes      | -                   |
+| connection_mode           | string  | no       | active_local        |
 | delimiter/field_delimiter | string  | no       | \001                |
 | read_columns              | list    | no       | -                   |
 | parse_partition_from_path | boolean | no       | true                |
@@ -154,6 +155,12 @@ connector will generate data as the following:
 |---------------|-----|--------|
 | tyrantlucifer | 26  | male   |
 
+### connection_mode [string]
+
+The target ftp connection mode , default is active mode, supported as the 
following modes:
+
+`active_local` `passive_local`
+
 ### delimiter/field_delimiter [string]
 
 **delimiter** parameter will deprecate after version 2.3.5, please use 
**field_delimiter** instead.
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 112ab9fa1c..3a6513e993 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import lombok.Data;
 import lombok.NonNull;
 
+import java.io.File;
 import java.io.Serializable;
 import java.util.Locale;
 
@@ -71,6 +72,10 @@ public class BaseFileSinkConfig implements DelimiterConfig, 
Serializable {
         }
         checkNotNull(path);
 
+        if (path.equals(File.separator)) {
+            this.path = "";
+        }
+
         if (config.hasPath(BaseSinkConfig.FILE_NAME_EXPRESSION.key())
                 && !StringUtils.isBlank(
                         
config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()))) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
index 7ab43b6db1..9186e1d8ee 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java
@@ -52,6 +52,11 @@ public class FtpConf extends HadoopConf {
                 "fs.ftp.user." + host, 
config.getString(FtpConfigOptions.FTP_USERNAME.key()));
         ftpOptions.put(
                 "fs.ftp.password." + host, 
config.getString(FtpConfigOptions.FTP_PASSWORD.key()));
+        if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) {
+            ftpOptions.put(
+                    "fs.ftp.connection.mode",
+                    
config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key()));
+        }
         hadoopConf.setExtraOptions(ftpOptions);
         return hadoopConf;
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
index 2834b7ac2f..1f00a56abf 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java
@@ -20,6 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.ftp.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE;
 
 public class FtpConfigOptions extends BaseSourceConfigOptions {
     public static final Option<String> FTP_PASSWORD =
@@ -36,4 +39,9 @@ public class FtpConfigOptions extends BaseSourceConfigOptions 
{
             
Options.key("host").stringType().noDefaultValue().withDescription("FTP server 
host");
     public static final Option<Integer> FTP_PORT =
             
Options.key("port").intType().noDefaultValue().withDescription("FTP server 
port");
+    public static final Option<FtpConnectionMode> FTP_CONNECTION_MODE =
+            Options.key("connection_mode")
+                    .enumType(FtpConnectionMode.class)
+                    .defaultValue(ACTIVE_LOCAL_DATA_CONNECTION_MODE)
+                    .withDescription("FTP server connection mode ");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index 2cc9a06a5f..a3fbf886fb 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -85,6 +85,7 @@ public class FtpFileSinkFactory implements TableSinkFactory {
                 .optional(BaseSinkConfig.DATE_FORMAT)
                 .optional(BaseSinkConfig.DATETIME_FORMAT)
                 .optional(BaseSinkConfig.TIME_FORMAT)
+                .optional(FtpConfigOptions.FTP_CONNECTION_MODE)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index e15afac55a..529c93a3f7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -62,6 +62,7 @@ public class FtpFileSourceFactory implements 
TableSourceFactory {
                 .optional(BaseSourceConfigOptions.TIME_FORMAT)
                 .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
                 .optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+                .optional(FtpConfigOptions.FTP_CONNECTION_MODE)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
new file mode 100644
index 0000000000..068aa5974c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.system;
+
+/** Ftp connection mode enum. href="http://commons.apache.org/net/";>Apache 
Commons Net</a>. */
+public enum FtpConnectionMode {
+
+    /** ACTIVE_LOCAL_DATA_CONNECTION_MODE */
+    ACTIVE_LOCAL_DATA_CONNECTION_MODE("active_local"),
+
+    /** PASSIVE_LOCAL_DATA_CONNECTION_MODE */
+    PASSIVE_LOCAL_DATA_CONNECTION_MODE("passive_local");
+
+    private final String mode;
+
+    FtpConnectionMode(String mode) {
+        this.mode = mode;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public static FtpConnectionMode fromMode(String mode) {
+        for (FtpConnectionMode ftpConnectionModeEnum : 
FtpConnectionMode.values()) {
+            if (ftpConnectionModeEnum.getMode().equals(mode)) {
+                return ftpConnectionModeEnum;
+            }
+        }
+        throw new IllegalArgumentException("Unknown ftp connection mode: " + 
mode);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
index 4b69c63416..04ba218e45 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
@@ -62,6 +62,8 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
     public static final String FS_FTP_HOST = "fs.ftp.host";
     public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
     public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
+    public static final String FS_FTP_CONNECTION_MODE = 
"fs.ftp.connection.mode";
+
     public static final String E_SAME_DIRECTORY_ONLY = "only same directory 
renames are supported";
 
     private URI uri;
@@ -153,9 +155,34 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
                             + "'");
         }
 
+        setFsFtpConnectionMode(
+                client,
+                conf.get(
+                        FS_FTP_CONNECTION_MODE,
+                        
FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE.getMode()));
+
         return client;
     }
 
+    /**
+     * Set FTP connection mode. *
+     *
+     * @param client FTPClient
+     * @param mode mode
+     */
+    private void setFsFtpConnectionMode(FTPClient client, String mode) {
+        switch (FtpConnectionMode.fromMode(mode)) {
+            case ACTIVE_LOCAL_DATA_CONNECTION_MODE:
+                client.enterLocalActiveMode();
+                break;
+            case PASSIVE_LOCAL_DATA_CONNECTION_MODE:
+                client.enterLocalPassiveMode();
+                break;
+            default:
+                break;
+        }
+    }
+
     /**
      * Logout and disconnect the given FTPClient. *
      *
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
index 15a58ebf08..2a1598bf32 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java
@@ -104,6 +104,9 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
                 
"/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
                 ftpContainer);
 
+        ContainerUtil.copyFileIntoContainers(
+                "/excel/e2e.xlsx", "/home/vsftpd/seatunnel/e2e.xlsx", 
ftpContainer);
+
         ftpContainer.execInContainer("sh", "-c", "chmod -R 777 
/home/vsftpd/seatunnel/");
         ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp 
/home/vsftpd/seatunnel/");
     }
@@ -136,6 +139,8 @@ public class FtpFileIT extends TestSuiteBase implements 
TestResource {
         helper.execute("/parquet/fake_to_ftp_file_parquet.conf");
         // test write ftp orc file
         helper.execute("/orc/fake_to_ftp_file_orc.conf");
+        // test write ftp root path excel file
+        helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf");
     }
 
     @AfterAll
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf
new file mode 100644
index 0000000000..3e11b0a08f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    result_table_name = "ftp"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  FtpFile {
+    host = "ftp"
+    port = 21
+    user = seatunnel
+    password = pass
+    path = "/"
+    source_table_name = "ftp"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "excel"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}

Reply via email to