This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 4f812e12ae [Feature][Connector-V2] Ftp file sink suport multiple table and save mode (#7665) 4f812e12ae is described below commit 4f812e12aeb20c301bdc5bc5fc93b47b6b2cee50 Author: 老王 <58297137+chl-...@users.noreply.github.com> AuthorDate: Sat Sep 14 21:42:45 2024 +0800 [Feature][Connector-V2] Ftp file sink suport multiple table and save mode (#7665) --- docs/en/connector-v2/sink/FtpFile.md | 42 +++++++++ .../seatunnel/file/ftp/catalog/FtpFileCatalog.java | 29 ++++++ .../file/ftp/catalog/FtpFileCatalogFactory.java | 53 +++++++++++ .../seatunnel/file/ftp/config/FtpConf.java | 25 ++--- .../seatunnel/file/ftp/sink/FtpFileSink.java | 39 ++------ .../file/ftp/sink/FtpFileSinkFactory.java | 25 ++++- .../seatunnel/file/ftp/source/FtpFileSource.java | 3 +- .../e2e/connector/file/ftp/FtpFileIT.java | 73 ++++++++++++++ .../text/multiple_table_fake_to_ftp_file_text.conf | 105 +++++++++++++++++++++ .../multiple_table_fake_to_ftp_file_text_2.conf | 105 +++++++++++++++++++++ 10 files changed, 451 insertions(+), 48 deletions(-) diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index 9305aa7e99..5b927bda12 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -64,6 +64,8 @@ By default, we use 2PC commit to ensure `exactly-once` | parquet_avro_write_timestamp_as_int96 | boolean | no | false | Only used when file_format is parquet. | | parquet_avro_write_fixed_as_int96 | array | no | - | Only used when file_format is parquet. | | encoding | string | no | "UTF-8" | Only used when file_format_type is json,text,csv,xml. | +| schema_save_mode | string | no | CREATE_SCHEMA_WHEN_NOT_EXIST | Existing dir processing method | +| data_save_mode | string | no | APPEND_DATA | Existing data processing method | ### host [string] @@ -227,6 +229,18 @@ Support writing Parquet INT96 from a 12-byte field, only valid for parquet files Only used when file_format_type is json,text,csv,xml. The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`. +### schema_save_mode [string] +Existing dir processing method. +- RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist +- CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist +- ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist +- IGNORE :Ignore the treatment of the table + +### data_save_mode [string] +Existing data processing method. +- DROP_DATA: preserve dir and delete data files +- APPEND_DATA: preserve dir, preserve data files +- ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported ## Example For text file format simple config @@ -273,6 +287,34 @@ FtpFile { ``` +When our source end is multiple tables, and wants different expressions to different directory, we can configure this way + +```hocon + +FtpFile { + host = "xxx.xxx.xxx.xxx" + port = 21 + user = "username" + password = "password" + path = "/data/ftp/seatunnel/job1/${table_name}" + tmp_path = "/data/ftp/seatunnel/tmp" + file_format_type = "text" + field_delimiter = "\t" + row_delimiter = "\n" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + custom_filename = true + file_name_expression = "${transactionId}_${now}" + sink_columns = ["name","age"] + filename_time_format = "yyyy.MM.dd" + schema_save_mode=RECREATE_SCHEMA + data_save_mode=DROP_DATA +} + +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java new file mode 100644 index 0000000000..2bf0bf49e5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalog.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog; + +import org.apache.seatunnel.connectors.seatunnel.file.catalog.AbstractFileCatalog; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; + +public class FtpFileCatalog extends AbstractFileCatalog { + + public FtpFileCatalog( + HadoopFileSystemProxy hadoopFileSystemProxy, String filePath, String catalogName) { + super(hadoopFileSystemProxy, filePath, catalogName); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java new file mode 100644 index 0000000000..74f05c12d7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/catalog/FtpFileCatalogFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.ftp.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf; +import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class FtpFileCatalogFactory implements CatalogFactory { + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + HadoopFileSystemProxy fileSystemUtils = + new HadoopFileSystemProxy(FtpConf.buildWithConfig(options)); + return new FtpFileCatalog( + fileSystemUtils, + options.get(BaseSourceConfigOptions.FILE_PATH), + FileSystemType.FTP.getFileSystemPluginName()); + } + + @Override + public String factoryIdentifier() { + return FileSystemType.FTP.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().build(); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java index 9186e1d8ee..bd98800c54 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java @@ -17,18 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode; import java.util.HashMap; +import java.util.Optional; public class FtpConf extends HadoopConf { private static final String HDFS_IMPL = "org.apache.seatunnel.connectors.seatunnel.file.ftp.system.SeaTunnelFTPFileSystem"; private static final String SCHEMA = "ftp"; - private FtpConf(String hdfsNameKey) { + public FtpConf(String hdfsNameKey) { super(hdfsNameKey); } @@ -42,20 +43,20 @@ public class FtpConf extends HadoopConf { return SCHEMA; } - public static HadoopConf buildWithConfig(Config config) { - String host = config.getString(FtpConfigOptions.FTP_HOST.key()); - int port = config.getInt(FtpConfigOptions.FTP_PORT.key()); + public static HadoopConf buildWithConfig(ReadonlyConfig config) { + String host = config.get(FtpConfigOptions.FTP_HOST); + int port = config.get(FtpConfigOptions.FTP_PORT); String defaultFS = String.format("ftp://%s:%s", host, port); HadoopConf hadoopConf = new FtpConf(defaultFS); HashMap<String, String> ftpOptions = new HashMap<>(); - ftpOptions.put( - "fs.ftp.user." + host, config.getString(FtpConfigOptions.FTP_USERNAME.key())); - ftpOptions.put( - "fs.ftp.password." + host, config.getString(FtpConfigOptions.FTP_PASSWORD.key())); - if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) { + ftpOptions.put("fs.ftp.user." + host, config.get(FtpConfigOptions.FTP_USERNAME)); + ftpOptions.put("fs.ftp.password." + host, config.get(FtpConfigOptions.FTP_PASSWORD)); + Optional<FtpConnectionMode> optional = + config.getOptional(FtpConfigOptions.FTP_CONNECTION_MODE); + if (optional.isPresent()) { ftpOptions.put( "fs.ftp.connection.mode", - config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key())); + config.get(FtpConfigOptions.FTP_CONNECTION_MODE).toString()); } hadoopConf.setExtraOptions(ftpOptions); return hadoopConf; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java index 031d442f20..f4b271e035 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java @@ -17,46 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf; -import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions; -import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; - -import com.google.auto.service.AutoService; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseMultipleTableFileSink; -@AutoService(SeaTunnelSink.class) -public class FtpFileSink extends BaseFileSink { +public class FtpFileSink extends BaseMultipleTableFileSink { @Override public String getPluginName() { return FileSystemType.FTP.getFileSystemPluginName(); } - @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - pluginConfig, - FtpConfigOptions.FTP_HOST.key(), - FtpConfigOptions.FTP_PORT.key(), - FtpConfigOptions.FTP_USERNAME.key(), - FtpConfigOptions.FTP_PASSWORD.key()); - if (!result.isSuccess()) { - throw new FileConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - super.prepare(pluginConfig); - hadoopConf = FtpConf.buildWithConfig(pluginConfig); + public FtpFileSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + super(FtpConf.buildWithConfig(readonlyConfig), readonlyConfig, catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index 24a9ed48f8..cfd2351a5c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -17,18 +17,27 @@ package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.factory.BaseMultipleTableFileSinkFactory; import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState; import com.google.auto.service.AutoService; @AutoService(Factory.class) -public class FtpFileSinkFactory implements TableSinkFactory { +public class FtpFileSinkFactory extends BaseMultipleTableFileSinkFactory { @Override public String factoryIdentifier() { return FileSystemType.FTP.getFileSystemPluginName(); @@ -42,7 +51,11 @@ public class FtpFileSinkFactory implements TableSinkFactory { .required(FtpConfigOptions.FTP_PORT) .required(FtpConfigOptions.FTP_USERNAME) .required(FtpConfigOptions.FTP_PASSWORD) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) + .optional(BaseSinkConfig.TMP_PATH) .optional(BaseSinkConfig.FILE_FORMAT_TYPE) + .optional(BaseSinkConfig.SCHEMA_SAVE_MODE) + .optional(BaseSinkConfig.DATA_SAVE_MODE) .conditional( BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, @@ -94,4 +107,12 @@ public class FtpFileSinkFactory implements TableSinkFactory { .optional(FtpConfigOptions.FTP_CONNECTION_MODE) .build(); } + + @Override + public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, FileAggregatedCommitInfo> + createSink(TableSinkFactoryContext context) { + ReadonlyConfig readonlyConfig = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new FtpFileSink(readonlyConfig, catalogTable); + } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java index b032717cab..d6f0f64abb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; @@ -78,7 +79,7 @@ public class FtpFileSource extends BaseFileSource { "Ftp file source connector only support read [text, csv, json] files"); } String path = pluginConfig.getString(FtpConfigOptions.FILE_PATH.key()); - hadoopConf = FtpConf.buildWithConfig(pluginConfig); + hadoopConf = FtpConf.buildWithConfig(ReadonlyConfig.fromConfig(pluginConfig)); readStrategy = ReadStrategyFactory.of( pluginConfig.getString(FtpConfigOptions.FILE_FORMAT_TYPE.key())); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java index 2a1598bf32..1b89a0bcc7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java @@ -25,17 +25,27 @@ import org.apache.seatunnel.e2e.common.container.TestHelper; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; +import org.apache.commons.lang3.StringUtils; + import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.github.dockerjava.core.command.ExecStartResultCallback; +import com.github.dockerjava.api.command.ExecCreateCmdResponse; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.stream.Stream; @DisabledOnContainer( @@ -143,6 +153,69 @@ public class FtpFileIT extends TestSuiteBase implements TestResource { helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf"); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.FLINK}, + disabledReason = "Flink dosen't support multi-table at now") + public void testMultipleTableAndSaveMode(TestContainer container) + throws IOException, InterruptedException { + TestHelper helper = new TestHelper(container); + // test mult table and save_mode:RECREATE_SCHEMA DROP_DATA + String homePath = "/home/vsftpd/seatunnel"; + String path1 = "/tmp/seatunnel_mult/text/source_1"; + String path2 = "/tmp/seatunnel_mult/text/source_2"; + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 0); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 0); + helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 1); + helper.execute("/text/multiple_table_fake_to_ftp_file_text.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path1).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path2).size(), 1); + // test mult table and save_mode:CREATE_SCHEMA_WHEN_NOT_EXIST APPEND_DATA + String path3 = "/tmp/seatunnel_mult2/text/source_1"; + String path4 = "/tmp/seatunnel_mult2/text/source_2"; + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 0); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 0); + helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 1); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 1); + helper.execute("/text/multiple_table_fake_to_ftp_file_text_2.conf"); + Assertions.assertEquals(getFileListFromContainer(homePath + path3).size(), 2); + Assertions.assertEquals(getFileListFromContainer(homePath + path4).size(), 2); + } + + @SneakyThrows + private List<String> getFileListFromContainer(String path) { + String command = "ls -1 " + path; + ExecCreateCmdResponse execCreateCmdResponse = + dockerClient + .execCreateCmd(ftpContainer.getContainerId()) + .withCmd("sh", "-c", command) + .withAttachStdout(true) + .withAttachStderr(true) + .exec(); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + dockerClient + .execStartCmd(execCreateCmdResponse.getId()) + .exec(new ExecStartResultCallback(outputStream, System.err)) + .awaitCompletion(); + + String output = new String(outputStream.toByteArray(), StandardCharsets.UTF_8).trim(); + List<String> fileList = new ArrayList<>(); + log.info("container path file list is :{}", output); + String[] files = output.split("\n"); + for (String file : files) { + if (StringUtils.isNotEmpty(file)) { + log.info("container path file name is :{}", file); + fileList.add(file); + } + } + return fileList; + } + @AfterAll @Override public void tearDown() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf new file mode 100644 index 0000000000..cd28e54399 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text.conf @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + result_table_name = "ftp" + tables_configs = [ + { + schema = { + table = "source_1" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "source_2" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] + } + ] + } +} + +transform { +} + +sink { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path = "/tmp/seatunnel_mult/text/${table_name}" + source_table_name = "ftp" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + "schema_save_mode"="RECREATE_SCHEMA" + "data_save_mode"="DROP_DATA" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf new file mode 100644 index 0000000000..e05a14ab86 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/text/multiple_table_fake_to_ftp_file_text_2.conf @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + result_table_name = "ftp" + tables_configs = [ + { + schema = { + table = "source_1" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW"] + } + ] + }, + { + schema = { + table = "source_2" + fields { + id = int + val_bool = boolean + val_tinyint = tinyint + val_smallint = smallint + val_int = int + val_bigint = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3] + } + ] + } + ] + } +} + +transform { +} + +sink { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path = "/tmp/seatunnel_mult2/text/${table_name}" + source_table_name = "ftp" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST" + "data_save_mode"="APPEND_DATA" + } +} \ No newline at end of file