This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b5f0b43fcb [Improve][Connector-V2] Ensure that the FTP connector 
behaves reliably during directory operation (#8959)
b5f0b43fcb is described below

commit b5f0b43fcb6d1700014645a088cefecd333cb959
Author: corgy-w <73771213+corg...@users.noreply.github.com>
AuthorDate: Thu Mar 13 15:20:34 2025 +0800

    [Improve][Connector-V2] Ensure that the FTP connector behaves reliably 
during directory operation (#8959)
---
 .../connector-file/connector-file-ftp/pom.xml      |  10 +
 .../file/ftp/system/SeaTunnelFTPFileSystem.java    | 130 +++++++---
 .../ftp/system/SeaTunnelFTPFileSystemTest.java     | 268 +++++++++++++++++++++
 3 files changed, 372 insertions(+), 36 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
index 0ab14d7e04..a3b84b84c5 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
@@ -29,6 +29,10 @@
     <artifactId>connector-file-ftp</artifactId>
     <name>SeaTunnel : Connectors V2 : File : Ftp</name>
 
+    <properties>
+        <mockftpserver.version>3.1.0</mockftpserver.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -48,6 +52,12 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.mockftpserver</groupId>
+            <artifactId>MockFtpServer</artifactId>
+            <version>${mockftpserver.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
index 963d18c703..51d10d4e97 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
@@ -127,13 +127,19 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
      * @throws IOException IOException
      */
     private FTPClient connect() throws IOException {
-        FTPClient client = null;
+        FTPClient client = new FTPClient();
         Configuration conf = getConf();
+        // Get the connection mode from configuration, default to 
passive_local mode
+        String connectionMode =
+                conf.get(FS_FTP_CONNECTION_MODE, 
FtpConnectionMode.ACTIVE_LOCAL.getMode());
+
+        // Retrieve host, port, user, and password from configuration
         String host = conf.get(FS_FTP_HOST);
         int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
         String user = conf.get(FS_FTP_USER_PREFIX + host);
         String password = conf.get(FS_FTP_PASSWORD_PREFIX + host);
-        client = new FTPClient();
+
+        // Connect to the FTP server
         client.connect(host, port);
         int reply = client.getReplyCode();
         if (!FTPReply.isPositiveCompletion(reply)) {
@@ -143,23 +149,29 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
                     NetUtils.UNKNOWN_HOST,
                     0,
                     new ConnectException("Server response " + reply));
-        } else if (client.login(user, password)) {
-            client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
-            client.setFileType(FTP.BINARY_FILE_TYPE);
-            client.setBufferSize(DEFAULT_BUFFER_SIZE);
-        } else {
+        }
+
+        // Log in to the FTP server
+        if (!client.login(user, password)) {
             throw new IOException(
-                    "Login failed on server - "
-                            + host
-                            + ", port - "
-                            + port
-                            + " as user '"
-                            + user
-                            + "'");
+                    String.format(
+                            "Login failed on server - %s, port - %d as user 
'%s', reply code: %d",
+                            host, port, user, client.getReplyCode()));
         }
 
-        setFsFtpConnectionMode(
-                client, conf.get(FS_FTP_CONNECTION_MODE, 
FtpConnectionMode.ACTIVE_LOCAL.getMode()));
+        // Set the file type to binary and buffer size
+        client.setFileType(FTP.BINARY_FILE_TYPE);
+        client.setBufferSize(DEFAULT_BUFFER_SIZE);
+        client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
+
+        // Set the connection mode
+        setFsFtpConnectionMode(client, connectionMode);
+
+        // Log successful connection information
+        LOG.info(
+                String.format(
+                        "Successfully connected to FTP server %s:%d in %s",
+                        host, port, connectionMode));
 
         return client;
     }
@@ -170,13 +182,39 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
      * @param client FTPClient
      * @param mode mode
      */
-    private void setFsFtpConnectionMode(FTPClient client, String mode) {
-        switch (FtpConnectionMode.fromMode(mode)) {
+    private void setFsFtpConnectionMode(FTPClient client, String mode) throws 
IOException {
+        FtpConnectionMode connectionMode = FtpConnectionMode.fromMode(mode);
+        switch (connectionMode) {
             case PASSIVE_LOCAL:
                 client.enterLocalPassiveMode();
+                LOG.info("Using passive mode for FTP connection");
                 break;
             case ACTIVE_LOCAL:
-                client.enterLocalActiveMode();
+                // Create a test directory to check if active mode is working
+                String pathName = "/.ftptest" + System.currentTimeMillis();
+                try {
+                    client.enterLocalActiveMode();
+                    // test active mode is working or not
+                    boolean created = client.makeDirectory(pathName);
+                    if (!created) {
+                        LOG.warn("Active mode failed, switching to passive 
mode");
+                        throw new IOException("FTP connection active mode test 
failed");
+                    }
+
+                    LOG.info("Using active mode for FTP connection");
+                } catch (IOException e) {
+                    // if active mode failed, switch to passive mode
+                    client.enterLocalPassiveMode();
+                    // update the connection mode to passive mode
+                    getConf()
+                            .set(FS_FTP_CONNECTION_MODE, 
FtpConnectionMode.PASSIVE_LOCAL.getMode());
+                } finally {
+                    // delete the test directory if it was created
+                    FTPFile[] files = client.listFiles(pathName);
+                    if (files != null && files.length > 0) {
+                        client.deleteFile(pathName);
+                    }
+                }
                 break;
             default:
                 log.warn(
@@ -548,30 +586,50 @@ public class SeaTunnelFTPFileSystem extends FileSystem {
      */
     private boolean mkdirs(FTPClient client, Path file, FsPermission 
permission)
             throws IOException {
-        boolean created = true;
         Path workDir = new Path(client.printWorkingDirectory());
         Path absolute = makeAbsolute(workDir, file);
-        String pathName = absolute.getName();
-        if (!exists(client, absolute)) {
-            Path parent = absolute.getParent();
-            created = parent == null || mkdirs(client, parent, 
FsPermission.getDirDefault());
-            if (created) {
-                String parentDir = parent.toUri().getPath();
-                client.changeWorkingDirectory(parentDir);
-                LOG.debug("Creating directory " + pathName);
-                created = client.makeDirectory(pathName);
+        // If directory already exists, return true
+        if (exists(client, absolute)) {
+            if (isFile(client, absolute)) {
+                throw new ParentNotDirectoryException(
+                        String.format(
+                                "Can't make directory for path %s since it is 
a file.", absolute));
             }
-        } else if (isFile(client, absolute)) {
-            throw new ParentNotDirectoryException(
+            return true;
+        }
+
+        // Create parent directories if they don't exist
+        Path parent = absolute.getParent();
+        if (parent != null && !exists(client, parent)) {
+            mkdirs(client, parent, FsPermission.getDirDefault());
+        }
+
+        // Create the directory
+        String pathName = absolute.getName();
+        String parentDir = parent != null ? parent.toUri().getPath() : "/";
+
+        // Change to parent directory
+        if (!client.changeWorkingDirectory(parentDir)) {
+            throw new IOException(
                     String.format(
-                            "Can't make directory for path %s since it is a 
file.", absolute));
-        } else {
-            LOG.debug("Skipping creation of existing directory " + file);
+                            "Failed to change working directory to %s, FTP 
reply code: %d, reply string: %s",
+                            parentDir, client.getReplyCode(), 
client.getReplyString()));
         }
+        // Create directory
+        boolean created = client.makeDirectory(pathName);
         if (!created) {
-            LOG.debug("Failed to create " + file);
+            // Double check if directory was actually created (some FTP 
servers don't return true)
+            if (!exists(client, absolute)) {
+                throw new IOException(
+                        String.format(
+                                "Failed to create directory %s in %s, FTP 
reply code: %d, reply string: %s",
+                                pathName,
+                                parentDir,
+                                client.getReplyCode(),
+                                client.getReplyString()));
+            }
         }
-        return created;
+        return true;
     }
 
     /**
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java
new file mode 100644
index 0000000000..fd6a1a9d05
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.ftp.system;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockftpserver.fake.UserAccount;
+import org.mockftpserver.fake.filesystem.DirectoryEntry;
+import org.mockftpserver.fake.filesystem.FileEntry;
+import org.mockftpserver.fake.filesystem.FileSystem;
+import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for SeaTunnelFTPFileSystem. */
+public class SeaTunnelFTPFileSystemTest {
+
+    private static final String USERNAME = "testuser";
+    private static final String PASSWORD = "testpass";
+    private static final String HOME_DIR = "/home/testuser";
+    private static final int SERVER_PORT = 0; // Use random port
+
+    private FakeFtpServer fakeFtpServer;
+    private SeaTunnelFTPFileSystem ftpFileSystem;
+    private Configuration conf;
+    private int serverPort;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        // Set up the mock FTP server
+        fakeFtpServer = new FakeFtpServer();
+        fakeFtpServer.setServerControlPort(SERVER_PORT);
+
+        // Create user account
+        UserAccount userAccount = new UserAccount(USERNAME, PASSWORD, 
HOME_DIR);
+        fakeFtpServer.addUserAccount(userAccount);
+
+        // Set up the file system
+        FileSystem fileSystem = new UnixFakeFileSystem();
+        fileSystem.add(new DirectoryEntry(HOME_DIR));
+        fileSystem.add(new FileEntry(HOME_DIR + "/test.txt", "Test content"));
+        fakeFtpServer.setFileSystem(fileSystem);
+
+        // Start the FTP server
+        fakeFtpServer.start();
+        serverPort = fakeFtpServer.getServerControlPort();
+
+        // Configure the FTP client
+        conf = new Configuration();
+        conf.set("fs.ftp.host", "localhost");
+        conf.setInt("fs.ftp.host.port", serverPort);
+        conf.set("fs.ftp.user.localhost", USERNAME);
+        conf.set("fs.ftp.password.localhost", PASSWORD);
+
+        // Initialize the FTP file system
+        ftpFileSystem = new SeaTunnelFTPFileSystem();
+        ftpFileSystem.initialize(new URI("ftp://localhost:"; + serverPort), 
conf);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (fakeFtpServer != null) {
+            fakeFtpServer.stop();
+        }
+    }
+
+    @Test
+    public void testMkdirs() throws IOException {
+        Path testDir = new Path(HOME_DIR + "/testDir/subDir");
+
+        // Create parent directories recursively
+        assertTrue(ftpFileSystem.mkdirs(testDir));
+
+        // Verify both parent and child directories exist
+        assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir")));
+        assertTrue(ftpFileSystem.exists(testDir));
+
+        // Verify it's really a directory
+        FileStatus status = ftpFileSystem.getFileStatus(testDir);
+        assertTrue(status.isDirectory());
+    }
+
+    @Test
+    public void testCreateAndDeleteFile() throws IOException {
+        Path testFile = new Path(HOME_DIR + "/newfile.txt");
+        String content = "Hello, World!";
+
+        // Create file
+        try (FSDataOutputStream out =
+                ftpFileSystem.create(testFile, null, false, 1024, (short) 1, 
1024, null)) {
+            out.write(content.getBytes(StandardCharsets.UTF_8));
+        }
+
+        // Verify file exists
+        assertTrue(ftpFileSystem.exists(testFile));
+
+        // Read file content
+        try (FSDataInputStream in = ftpFileSystem.open(testFile, 1024)) {
+            byte[] buffer = new byte[content.length()];
+            in.readFully(buffer);
+            assertEquals(content, new String(buffer, StandardCharsets.UTF_8));
+        }
+
+        // Delete file
+        assertTrue(ftpFileSystem.delete(testFile, false));
+        assertFalse(ftpFileSystem.exists(testFile));
+    }
+
+    @Test
+    public void testListStatus() throws IOException {
+        // Create test directory structure
+        Path testDir = new Path(HOME_DIR + "/testListDir");
+        ftpFileSystem.mkdirs(testDir, null);
+
+        Path testFile1 = new Path(testDir, "file1.txt");
+        Path testFile2 = new Path(testDir, "file2.txt");
+
+        try (FSDataOutputStream out =
+                ftpFileSystem.create(testFile1, null, false, 1024, (short) 1, 
1024, null)) {
+            out.write("content1".getBytes(StandardCharsets.UTF_8));
+        }
+        try (FSDataOutputStream out =
+                ftpFileSystem.create(testFile2, null, false, 1024, (short) 1, 
1024, null)) {
+            out.write("content2".getBytes(StandardCharsets.UTF_8));
+        }
+
+        FileStatus[] statuses = ftpFileSystem.listStatus(testDir);
+        assertEquals(2, statuses.length);
+
+        // Clean up
+        ftpFileSystem.delete(testDir, true);
+    }
+
+    @Test
+    public void testRename() throws IOException {
+        Path source = new Path(HOME_DIR + "/source.txt");
+        Path target = new Path(HOME_DIR + "/target.txt");
+
+        // Create source file
+        try (FSDataOutputStream out =
+                ftpFileSystem.create(source, null, false, 1024, (short) 1, 
1024, null)) {
+            out.write("test content".getBytes(StandardCharsets.UTF_8));
+        }
+
+        // Rename file
+        assertTrue(ftpFileSystem.rename(source, target));
+        assertFalse(ftpFileSystem.exists(source));
+        assertTrue(ftpFileSystem.exists(target));
+    }
+
+    @Test
+    public void testConnectionModes() throws Exception {
+        // Test passive mode
+        conf.set("fs.ftp.connection.mode", "PASSIVE_LOCAL");
+        ftpFileSystem.initialize(new URI("ftp://localhost:"; + serverPort), 
conf);
+        Path testFile = new Path(HOME_DIR + "/passive_test.txt");
+        assertTrue(ftpFileSystem.mkdirs(testFile.getParent(), null));
+
+        // Test active mode
+        conf.set("fs.ftp.connection.mode", "ACTIVE_LOCAL");
+        ftpFileSystem.initialize(new URI("ftp://localhost:"; + serverPort), 
conf);
+        Path testFile2 = new Path(HOME_DIR + "/active_test.txt");
+        assertTrue(ftpFileSystem.mkdirs(testFile2.getParent(), null));
+    }
+
+    @Test
+    public void testMkdirsWithPermission() throws IOException {
+        Path testDir = new Path(HOME_DIR + "/testDir/subDir");
+        FsPermission permission = FsPermission.createImmutable((short) 0755); 
// rwxr-xr-x
+
+        // Create parent directories recursively with permission
+        assertTrue(ftpFileSystem.mkdirs(testDir, permission));
+
+        // Verify both parent and child directories exist
+        assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir")));
+        assertTrue(ftpFileSystem.exists(testDir));
+
+        // Verify it's really a directory
+        FileStatus status = ftpFileSystem.getFileStatus(testDir);
+        assertTrue(status.isDirectory());
+
+        // Verify directory was created in the mock filesystem
+        DirectoryEntry dirEntry =
+                (DirectoryEntry) 
fakeFtpServer.getFileSystem().getEntry(testDir.toString());
+        assertNotNull(dirEntry);
+    }
+
+    @Test
+    public void testMkdirsWithNullPermission() throws IOException {
+        Path testDir = new Path(HOME_DIR + "/testDir/subDir");
+
+        // Create parent directories recursively with null permission
+        assertTrue(ftpFileSystem.mkdirs(testDir, null));
+
+        // Verify both parent and child directories exist
+        assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir")));
+        assertTrue(ftpFileSystem.exists(testDir));
+
+        // Verify it's really a directory
+        FileStatus status = ftpFileSystem.getFileStatus(testDir);
+        assertTrue(status.isDirectory());
+        // Don't verify the exact permission since it may vary by system
+        assertNotNull(status.getPermission());
+    }
+
+    @Test
+    public void testMkdirsWithNestedDirectories() throws IOException {
+        Path deepDir = new Path(HOME_DIR + "/a/b/c/d");
+        FsPermission permission = FsPermission.createImmutable((short) 0755);
+
+        // Create nested directories
+        assertTrue(ftpFileSystem.mkdirs(deepDir, permission));
+
+        // Verify all parent directories exist
+        assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a")));
+        assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a/b")));
+        assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a/b/c")));
+        assertTrue(ftpFileSystem.exists(deepDir));
+
+        // Verify all are directories
+        assertTrue(ftpFileSystem.getFileStatus(deepDir).isDirectory());
+    }
+
+    @Test
+    public void testMkdirsWithExistingDirectory() throws IOException {
+        Path testDir = new Path(HOME_DIR + "/existing");
+
+        // Create directory first time
+        assertTrue(ftpFileSystem.mkdirs(testDir));
+
+        // Try to create same directory again
+        assertTrue(ftpFileSystem.mkdirs(testDir));
+
+        // Verify it's still a directory
+        assertTrue(ftpFileSystem.getFileStatus(testDir).isDirectory());
+    }
+}

Reply via email to