This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new b5f0b43fcb [Improve][Connector-V2] Ensure that the FTP connector behaves reliably during directory operation (#8959) b5f0b43fcb is described below commit b5f0b43fcb6d1700014645a088cefecd333cb959 Author: corgy-w <73771213+corg...@users.noreply.github.com> AuthorDate: Thu Mar 13 15:20:34 2025 +0800 [Improve][Connector-V2] Ensure that the FTP connector behaves reliably during directory operation (#8959) --- .../connector-file/connector-file-ftp/pom.xml | 10 + .../file/ftp/system/SeaTunnelFTPFileSystem.java | 130 +++++++--- .../ftp/system/SeaTunnelFTPFileSystemTest.java | 268 +++++++++++++++++++++ 3 files changed, 372 insertions(+), 36 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml index 0ab14d7e04..a3b84b84c5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml @@ -29,6 +29,10 @@ <artifactId>connector-file-ftp</artifactId> <name>SeaTunnel : Connectors V2 : File : Ftp</name> + <properties> + <mockftpserver.version>3.1.0</mockftpserver.version> + </properties> + <dependencies> <dependency> <groupId>org.apache.seatunnel</groupId> @@ -48,6 +52,12 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.mockftpserver</groupId> + <artifactId>MockFtpServer</artifactId> + <version>${mockftpserver.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java index 963d18c703..51d10d4e97 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java @@ -127,13 +127,19 @@ public class SeaTunnelFTPFileSystem extends FileSystem { * @throws IOException IOException */ private FTPClient connect() throws IOException { - FTPClient client = null; + FTPClient client = new FTPClient(); Configuration conf = getConf(); + // Get the connection mode from configuration, default to passive_local mode + String connectionMode = + conf.get(FS_FTP_CONNECTION_MODE, FtpConnectionMode.ACTIVE_LOCAL.getMode()); + + // Retrieve host, port, user, and password from configuration String host = conf.get(FS_FTP_HOST); int port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT); String user = conf.get(FS_FTP_USER_PREFIX + host); String password = conf.get(FS_FTP_PASSWORD_PREFIX + host); - client = new FTPClient(); + + // Connect to the FTP server client.connect(host, port); int reply = client.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { @@ -143,23 +149,29 @@ public class SeaTunnelFTPFileSystem extends FileSystem { NetUtils.UNKNOWN_HOST, 0, new ConnectException("Server response " + reply)); - } else if (client.login(user, password)) { - client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); - client.setFileType(FTP.BINARY_FILE_TYPE); - client.setBufferSize(DEFAULT_BUFFER_SIZE); - } else { + } + + // Log in to the FTP server + if (!client.login(user, password)) { throw new IOException( - "Login failed on server - " - + host - + ", port - " - + port - + " as user '" - + user - + "'"); + String.format( + "Login failed on server - %s, port - %d as user '%s', reply code: %d", + host, port, user, client.getReplyCode())); } - setFsFtpConnectionMode( - client, conf.get(FS_FTP_CONNECTION_MODE, FtpConnectionMode.ACTIVE_LOCAL.getMode())); + // Set the file type to binary and buffer size + client.setFileType(FTP.BINARY_FILE_TYPE); + client.setBufferSize(DEFAULT_BUFFER_SIZE); + client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); + + // Set the connection mode + setFsFtpConnectionMode(client, connectionMode); + + // Log successful connection information + LOG.info( + String.format( + "Successfully connected to FTP server %s:%d in %s", + host, port, connectionMode)); return client; } @@ -170,13 +182,39 @@ public class SeaTunnelFTPFileSystem extends FileSystem { * @param client FTPClient * @param mode mode */ - private void setFsFtpConnectionMode(FTPClient client, String mode) { - switch (FtpConnectionMode.fromMode(mode)) { + private void setFsFtpConnectionMode(FTPClient client, String mode) throws IOException { + FtpConnectionMode connectionMode = FtpConnectionMode.fromMode(mode); + switch (connectionMode) { case PASSIVE_LOCAL: client.enterLocalPassiveMode(); + LOG.info("Using passive mode for FTP connection"); break; case ACTIVE_LOCAL: - client.enterLocalActiveMode(); + // Create a test directory to check if active mode is working + String pathName = "/.ftptest" + System.currentTimeMillis(); + try { + client.enterLocalActiveMode(); + // test active mode is working or not + boolean created = client.makeDirectory(pathName); + if (!created) { + LOG.warn("Active mode failed, switching to passive mode"); + throw new IOException("FTP connection active mode test failed"); + } + + LOG.info("Using active mode for FTP connection"); + } catch (IOException e) { + // if active mode failed, switch to passive mode + client.enterLocalPassiveMode(); + // update the connection mode to passive mode + getConf() + .set(FS_FTP_CONNECTION_MODE, FtpConnectionMode.PASSIVE_LOCAL.getMode()); + } finally { + // delete the test directory if it was created + FTPFile[] files = client.listFiles(pathName); + if (files != null && files.length > 0) { + client.deleteFile(pathName); + } + } break; default: log.warn( @@ -548,30 +586,50 @@ public class SeaTunnelFTPFileSystem extends FileSystem { */ private boolean mkdirs(FTPClient client, Path file, FsPermission permission) throws IOException { - boolean created = true; Path workDir = new Path(client.printWorkingDirectory()); Path absolute = makeAbsolute(workDir, file); - String pathName = absolute.getName(); - if (!exists(client, absolute)) { - Path parent = absolute.getParent(); - created = parent == null || mkdirs(client, parent, FsPermission.getDirDefault()); - if (created) { - String parentDir = parent.toUri().getPath(); - client.changeWorkingDirectory(parentDir); - LOG.debug("Creating directory " + pathName); - created = client.makeDirectory(pathName); + // If directory already exists, return true + if (exists(client, absolute)) { + if (isFile(client, absolute)) { + throw new ParentNotDirectoryException( + String.format( + "Can't make directory for path %s since it is a file.", absolute)); } - } else if (isFile(client, absolute)) { - throw new ParentNotDirectoryException( + return true; + } + + // Create parent directories if they don't exist + Path parent = absolute.getParent(); + if (parent != null && !exists(client, parent)) { + mkdirs(client, parent, FsPermission.getDirDefault()); + } + + // Create the directory + String pathName = absolute.getName(); + String parentDir = parent != null ? parent.toUri().getPath() : "/"; + + // Change to parent directory + if (!client.changeWorkingDirectory(parentDir)) { + throw new IOException( String.format( - "Can't make directory for path %s since it is a file.", absolute)); - } else { - LOG.debug("Skipping creation of existing directory " + file); + "Failed to change working directory to %s, FTP reply code: %d, reply string: %s", + parentDir, client.getReplyCode(), client.getReplyString())); } + // Create directory + boolean created = client.makeDirectory(pathName); if (!created) { - LOG.debug("Failed to create " + file); + // Double check if directory was actually created (some FTP servers don't return true) + if (!exists(client, absolute)) { + throw new IOException( + String.format( + "Failed to create directory %s in %s, FTP reply code: %d, reply string: %s", + pathName, + parentDir, + client.getReplyCode(), + client.getReplyString())); + } } - return created; + return true; } /** diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java new file mode 100644 index 0000000000..fd6a1a9d05 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystemTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.ftp.system; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockftpserver.fake.FakeFtpServer; +import org.mockftpserver.fake.UserAccount; +import org.mockftpserver.fake.filesystem.DirectoryEntry; +import org.mockftpserver.fake.filesystem.FileEntry; +import org.mockftpserver.fake.filesystem.FileSystem; +import org.mockftpserver.fake.filesystem.UnixFakeFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Unit tests for SeaTunnelFTPFileSystem. */ +public class SeaTunnelFTPFileSystemTest { + + private static final String USERNAME = "testuser"; + private static final String PASSWORD = "testpass"; + private static final String HOME_DIR = "/home/testuser"; + private static final int SERVER_PORT = 0; // Use random port + + private FakeFtpServer fakeFtpServer; + private SeaTunnelFTPFileSystem ftpFileSystem; + private Configuration conf; + private int serverPort; + + @BeforeEach + public void setUp() throws Exception { + // Set up the mock FTP server + fakeFtpServer = new FakeFtpServer(); + fakeFtpServer.setServerControlPort(SERVER_PORT); + + // Create user account + UserAccount userAccount = new UserAccount(USERNAME, PASSWORD, HOME_DIR); + fakeFtpServer.addUserAccount(userAccount); + + // Set up the file system + FileSystem fileSystem = new UnixFakeFileSystem(); + fileSystem.add(new DirectoryEntry(HOME_DIR)); + fileSystem.add(new FileEntry(HOME_DIR + "/test.txt", "Test content")); + fakeFtpServer.setFileSystem(fileSystem); + + // Start the FTP server + fakeFtpServer.start(); + serverPort = fakeFtpServer.getServerControlPort(); + + // Configure the FTP client + conf = new Configuration(); + conf.set("fs.ftp.host", "localhost"); + conf.setInt("fs.ftp.host.port", serverPort); + conf.set("fs.ftp.user.localhost", USERNAME); + conf.set("fs.ftp.password.localhost", PASSWORD); + + // Initialize the FTP file system + ftpFileSystem = new SeaTunnelFTPFileSystem(); + ftpFileSystem.initialize(new URI("ftp://localhost:" + serverPort), conf); + } + + @AfterEach + public void tearDown() { + if (fakeFtpServer != null) { + fakeFtpServer.stop(); + } + } + + @Test + public void testMkdirs() throws IOException { + Path testDir = new Path(HOME_DIR + "/testDir/subDir"); + + // Create parent directories recursively + assertTrue(ftpFileSystem.mkdirs(testDir)); + + // Verify both parent and child directories exist + assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir"))); + assertTrue(ftpFileSystem.exists(testDir)); + + // Verify it's really a directory + FileStatus status = ftpFileSystem.getFileStatus(testDir); + assertTrue(status.isDirectory()); + } + + @Test + public void testCreateAndDeleteFile() throws IOException { + Path testFile = new Path(HOME_DIR + "/newfile.txt"); + String content = "Hello, World!"; + + // Create file + try (FSDataOutputStream out = + ftpFileSystem.create(testFile, null, false, 1024, (short) 1, 1024, null)) { + out.write(content.getBytes(StandardCharsets.UTF_8)); + } + + // Verify file exists + assertTrue(ftpFileSystem.exists(testFile)); + + // Read file content + try (FSDataInputStream in = ftpFileSystem.open(testFile, 1024)) { + byte[] buffer = new byte[content.length()]; + in.readFully(buffer); + assertEquals(content, new String(buffer, StandardCharsets.UTF_8)); + } + + // Delete file + assertTrue(ftpFileSystem.delete(testFile, false)); + assertFalse(ftpFileSystem.exists(testFile)); + } + + @Test + public void testListStatus() throws IOException { + // Create test directory structure + Path testDir = new Path(HOME_DIR + "/testListDir"); + ftpFileSystem.mkdirs(testDir, null); + + Path testFile1 = new Path(testDir, "file1.txt"); + Path testFile2 = new Path(testDir, "file2.txt"); + + try (FSDataOutputStream out = + ftpFileSystem.create(testFile1, null, false, 1024, (short) 1, 1024, null)) { + out.write("content1".getBytes(StandardCharsets.UTF_8)); + } + try (FSDataOutputStream out = + ftpFileSystem.create(testFile2, null, false, 1024, (short) 1, 1024, null)) { + out.write("content2".getBytes(StandardCharsets.UTF_8)); + } + + FileStatus[] statuses = ftpFileSystem.listStatus(testDir); + assertEquals(2, statuses.length); + + // Clean up + ftpFileSystem.delete(testDir, true); + } + + @Test + public void testRename() throws IOException { + Path source = new Path(HOME_DIR + "/source.txt"); + Path target = new Path(HOME_DIR + "/target.txt"); + + // Create source file + try (FSDataOutputStream out = + ftpFileSystem.create(source, null, false, 1024, (short) 1, 1024, null)) { + out.write("test content".getBytes(StandardCharsets.UTF_8)); + } + + // Rename file + assertTrue(ftpFileSystem.rename(source, target)); + assertFalse(ftpFileSystem.exists(source)); + assertTrue(ftpFileSystem.exists(target)); + } + + @Test + public void testConnectionModes() throws Exception { + // Test passive mode + conf.set("fs.ftp.connection.mode", "PASSIVE_LOCAL"); + ftpFileSystem.initialize(new URI("ftp://localhost:" + serverPort), conf); + Path testFile = new Path(HOME_DIR + "/passive_test.txt"); + assertTrue(ftpFileSystem.mkdirs(testFile.getParent(), null)); + + // Test active mode + conf.set("fs.ftp.connection.mode", "ACTIVE_LOCAL"); + ftpFileSystem.initialize(new URI("ftp://localhost:" + serverPort), conf); + Path testFile2 = new Path(HOME_DIR + "/active_test.txt"); + assertTrue(ftpFileSystem.mkdirs(testFile2.getParent(), null)); + } + + @Test + public void testMkdirsWithPermission() throws IOException { + Path testDir = new Path(HOME_DIR + "/testDir/subDir"); + FsPermission permission = FsPermission.createImmutable((short) 0755); // rwxr-xr-x + + // Create parent directories recursively with permission + assertTrue(ftpFileSystem.mkdirs(testDir, permission)); + + // Verify both parent and child directories exist + assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir"))); + assertTrue(ftpFileSystem.exists(testDir)); + + // Verify it's really a directory + FileStatus status = ftpFileSystem.getFileStatus(testDir); + assertTrue(status.isDirectory()); + + // Verify directory was created in the mock filesystem + DirectoryEntry dirEntry = + (DirectoryEntry) fakeFtpServer.getFileSystem().getEntry(testDir.toString()); + assertNotNull(dirEntry); + } + + @Test + public void testMkdirsWithNullPermission() throws IOException { + Path testDir = new Path(HOME_DIR + "/testDir/subDir"); + + // Create parent directories recursively with null permission + assertTrue(ftpFileSystem.mkdirs(testDir, null)); + + // Verify both parent and child directories exist + assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/testDir"))); + assertTrue(ftpFileSystem.exists(testDir)); + + // Verify it's really a directory + FileStatus status = ftpFileSystem.getFileStatus(testDir); + assertTrue(status.isDirectory()); + // Don't verify the exact permission since it may vary by system + assertNotNull(status.getPermission()); + } + + @Test + public void testMkdirsWithNestedDirectories() throws IOException { + Path deepDir = new Path(HOME_DIR + "/a/b/c/d"); + FsPermission permission = FsPermission.createImmutable((short) 0755); + + // Create nested directories + assertTrue(ftpFileSystem.mkdirs(deepDir, permission)); + + // Verify all parent directories exist + assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a"))); + assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a/b"))); + assertTrue(ftpFileSystem.exists(new Path(HOME_DIR + "/a/b/c"))); + assertTrue(ftpFileSystem.exists(deepDir)); + + // Verify all are directories + assertTrue(ftpFileSystem.getFileStatus(deepDir).isDirectory()); + } + + @Test + public void testMkdirsWithExistingDirectory() throws IOException { + Path testDir = new Path(HOME_DIR + "/existing"); + + // Create directory first time + assertTrue(ftpFileSystem.mkdirs(testDir)); + + // Try to create same directory again + assertTrue(ftpFileSystem.mkdirs(testDir)); + + // Verify it's still a directory + assertTrue(ftpFileSystem.getFileStatus(testDir).isDirectory()); + } +}