This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 74db1cbaac [Feature][File] Support extract CSV files with different columns in different order (#9064) 74db1cbaac is described below commit 74db1cbaac8e1b913c7959d5bdc63a087aad967a Author: litiliu <38579068+liti...@users.noreply.github.com> AuthorDate: Fri Mar 28 22:09:11 2025 +0800 [Feature][File] Support extract CSV files with different columns in different order (#9064) --- docs/en/connector-v2/source/CosFile.md | 5 ++ docs/en/connector-v2/source/FtpFile.md | 5 ++ docs/en/connector-v2/source/HdfsFile.md | 1 + docs/en/connector-v2/source/LocalFile.md | 5 ++ docs/en/connector-v2/source/OssFile.md | 2 + docs/en/connector-v2/source/OssJindoFile.md | 1 + docs/en/connector-v2/source/S3File.md | 2 + docs/en/connector-v2/source/SftpFile.md | 1 + docs/zh/connector-v2/source/CosFile.md | 54 +++++++------- docs/zh/connector-v2/source/FtpFile.md | 58 ++++++++------- docs/zh/connector-v2/source/HdfsFile.md | 2 + .../file/config/BaseSourceConfigOptions.java | 7 ++ .../file/source/reader/CsvReadStrategy.java | 42 ++++++++++- .../e2e/connector/file/local/LocalFileIT.java | 9 +++ .../src/test/resources/csv/csv_with_header1.csv | 2 + .../src/test/resources/csv/csv_with_header2.csv | 2 + .../resources/csv/csv_with_header_to_assert.conf | 82 ++++++++++++++++++++++ 17 files changed, 228 insertions(+), 52 deletions(-) diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md index 6f1129adc4..92222e221f 100644 --- a/docs/en/connector-v2/source/CosFile.md +++ b/docs/en/connector-v2/source/CosFile.md @@ -66,6 +66,7 @@ To use this connector you need put hadoop-cos-{hadoop.version}-{version}.jar and | sheet_name | string | no | - | | xml_row_tag | string | no | - | | xml_use_attr_format | boolean | no | - | +| csv_use_header_line | boolean | no | false | | file_filter_pattern | string | no | - | | filename_extension | string | no | - | | compress_codec | string | no | none | @@ -274,6 +275,10 @@ Only need to be configured when file_format is xml. Specifies Whether to process data using the tag attribute format. +### csv_use_header_line [boolean] + +Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 + ### file_filter_pattern [string] Filter pattern, which used for filtering files. diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index de87cc9fda..f13b86fca6 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -60,6 +60,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | sheet_name | string | no | - | | xml_row_tag | string | no | - | | xml_use_attr_format | boolean | no | - | +| csv_use_header_line | boolean | no | - | | file_filter_pattern | string | no | - | | filename_extension | string | no | - | | compress_codec | string | no | none | @@ -317,6 +318,10 @@ Only need to be configured when file_format is xml. Specifies Whether to process data using the tag attribute format. +### csv_use_header_line [boolean] + +Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 + ### compress_codec [string] The compress codec of files and the details that supported as the following shown: diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 903398d829..c7cd5b8073 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -64,6 +64,7 @@ Read data from hdfs file system. | sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | | xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. | +| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 | | file_filter_pattern | string | no | | Filter pattern, which used for filtering files. | | filename_extension | string | no | - | Filter filename extension, which used for filtering files with specific extension. Example: `csv` `.txt` `json` `.xml`. | | compress_codec | string | no | none | The compress codec of files | diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index c88dda0c9b..408cfbfff3 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -61,6 +61,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | excel_engine | string | no | POI | | xml_row_tag | string | no | - | | xml_use_attr_format | boolean | no | - | +| csv_use_header_line | boolean | no | false | | file_filter_pattern | string | no | - | | filename_extension | string | no | - | | compress_codec | string | no | none | @@ -265,6 +266,10 @@ Only need to be configured when file_format is xml. Specifies Whether to process data using the tag attribute format. +### csv_use_header_line [boolean] + +Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 + ### file_filter_pattern [string] Filter pattern, which used for filtering files. diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md index f6f061c1d6..bf19076c8c 100644 --- a/docs/en/connector-v2/source/OssFile.md +++ b/docs/en/connector-v2/source/OssFile.md @@ -194,10 +194,12 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` | | filename_extension | string | no | - | Filter filename extension, which used for filtering files with specific extension. Example: `csv` `.txt` `json` `.xml`. | | skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files | +| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 | | schema | config | no | - | The schema of upstream data. | | sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | | xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. | +| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 | | compress_codec | string | no | none | Which compress codec the files used. | | encoding | string | no | UTF-8 | | null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` | diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md index b2d891d026..d173bfd7df 100644 --- a/docs/en/connector-v2/source/OssJindoFile.md +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -70,6 +70,7 @@ It only supports hadoop version **2.9.X+**. | sheet_name | string | no | - | | xml_row_tag | string | no | - | | xml_use_attr_format | boolean | no | - | +| csv_use_header_line | boolean | no | false | | file_filter_pattern | string | no | | | compress_codec | string | no | none | | archive_compress_codec | string | no | none | diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index a5e5270307..ae82ca1133 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -201,10 +201,12 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` [...] | time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS` [...] | skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv. For example, set like following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 lines from source files [...] +| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 | | schema | config | no | - | The schema of upstream data. [...] | sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. [...] | xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only valid for XML files. [...] | xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only valid for XML files. [...] +| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 | | compress_codec | string | no | none | [...] | archive_compress_codec | string | no | none | [...] | encoding | string | no | UTF-8 | [...] diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index c270a2516a..b6745cc681 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -93,6 +93,7 @@ The File does not have a specific type list, and we can indicate which SeaTunnel | sheet_name | String | No | - | Reader the sheet of the workbook,Only used when file_format is excel. | | xml_row_tag | string | no | - | Specifies the tag name of the data rows within the XML file, only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. | +| csv_use_header_line | boolean | no | false | Whether to use the header line to parse the file, only used when the file_format is `csv` and the file contains the header line that match RFC 4180 | | schema | Config | No | - | Please check #schema below | | compress_codec | String | No | None | The compress codec of files and the details that supported as the following shown: <br/> - txt: `lzo` `None` <br/> - json: `lzo` `None` <br/> - csv: `lzo` `None` <br/> - orc: `lzo` `snappy` `lz4` `zlib` `None` <br/> - parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `None` <br/> Tips: excel type does Not support any compression format | | archive_compress_codec | string | no | none | diff --git a/docs/zh/connector-v2/source/CosFile.md b/docs/zh/connector-v2/source/CosFile.md index 584ce02f45..b160eafecd 100644 --- a/docs/zh/connector-v2/source/CosFile.md +++ b/docs/zh/connector-v2/source/CosFile.md @@ -47,30 +47,31 @@ import ChangeLog from '../changelog/connector-file-cos.md'; ## 选项 -|名称 | 类型 | 必需 | 默认值 | -|---------------------------|---------|---------|---------------------| -| path | string | 是 | - | -| file_format_type | string | 是 | - | -| bucket | string | 是 | - | -| secret_id | string | 是 | - | -| secret_key | string | 是 | - | -| region | string | 是 | - | -| read_columns | list | 是 | - | -| delimiter/field_delimiter | string | 否 | \001 | -| parse_partition_from_path | boolean | 否 | true | -| skip_header_row_number | long | 否 | 0 | -| date_format | string | 否 | yyyy-MM-dd | -| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | -| time_format | string | 否 | HH:mm:ss | -| schema | config | 否 | - | -| sheet_name | string | 否 | - | -| xml_row_tag | string | 否 | - | -| xml_use_attr_format | boolean | 否 | - | -| file_filter_pattern | string | 否 | | -| compress_codec | string | 否 | none | -| archive_compress_codec | string | 否 | none | -| encoding | string | 否 | UTF-8 | -| common-options | | 否 | - | +| 名称 | 类型 | 必需 | 默认值 | +|---------------------------------------|---------|-----|---------------------| +| path | string | 是 | - | +| file_format_type | string | 是 | - | +| bucket | string | 是 | - | +| secret_id | string | 是 | - | +| secret_key | string | 是 | - | +| region | string | 是 | - | +| read_columns | list | 是 | - | +| delimiter/field_delimiter | string | 否 | \001 | +| parse_partition_from_path | boolean | 否 | true | +| skip_header_row_number | long | 否 | 0 | +| date_format | string | 否 | yyyy-MM-dd | +| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | +| time_format | string | 否 | HH:mm:ss | +| schema | config | 否 | - | +| sheet_name | string | 否 | - | +| xml_row_tag | string | 否 | - | +| xml_use_attr_format | boolean | 否 | - | +| csv_use_header_line | boolean | 否 | false | +| file_filter_pattern | string | 否 | | +| compress_codec | string | 否 | none | +| archive_compress_codec | string | 否 | none | +| encoding | string | 否 | UTF-8 | +| common-options | | 否 | - | ### path [string] @@ -271,6 +272,11 @@ default `HH:mm:ss` 仅当file_format为xml时才需要配置。 指定是否使用标记属性格式处理数据。 +### csv_use_header_line [boolean] + +仅在文件格式为 csv 时可以选择配置。 +是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配 + ### file_filter_pattern [string] 过滤模式,用于过滤文件。 diff --git a/docs/zh/connector-v2/source/FtpFile.md b/docs/zh/connector-v2/source/FtpFile.md index 6aa1b9178d..c44a12c38c 100644 --- a/docs/zh/connector-v2/source/FtpFile.md +++ b/docs/zh/connector-v2/source/FtpFile.md @@ -39,32 +39,33 @@ import ChangeLog from '../changelog/connector-file-ftp.md'; ## 配置项 -| 名称 | 类型 | 是否必填 | 默认值 | -|---------------------------|---------|----------|---------------------| -| host | string | 是 | - | -| port | int | 是 | - | -| user | string | 是 | - | -| password | string | 是 | - | -| path | string | 是 | - | -| file_format_type | string | 是 | - | -| connection_mode | string | 否 | active_local | -| delimiter/field_delimiter | string | 否 | \001 | -| read_columns | list | 否 | - | -| parse_partition_from_path | boolean | 否 | true | -| date_format | string | 否 | yyyy-MM-dd | -| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | -| time_format | string | 否 | HH:mm:ss | -| skip_header_row_number | long | 否 | 0 | -| schema | config | 否 | - | -| sheet_name | string | 否 | - | -| xml_row_tag | string | 否 | - | -| xml_use_attr_format | boolean | 否 | - | -| file_filter_pattern | string | 否 | - | -| compress_codec | string | 否 | none | -| archive_compress_codec | string | 否 | none | -| encoding | string | 否 | UTF-8 | -| null_format | string | 否 | - | -| common-options | | 否 | - | +| 名称 | 类型 | 是否必填 | 默认值 | +|---------------------------|---------|-------|---------------------| +| host | string | 是 | - | +| port | int | 是 | - | +| user | string | 是 | - | +| password | string | 是 | - | +| path | string | 是 | - | +| file_format_type | string | 是 | - | +| connection_mode | string | 否 | active_local | +| delimiter/field_delimiter | string | 否 | \001 | +| read_columns | list | 否 | - | +| parse_partition_from_path | boolean | 否 | true | +| date_format | string | 否 | yyyy-MM-dd | +| datetime_format | string | 否 | yyyy-MM-dd HH:mm:ss | +| time_format | string | 否 | HH:mm:ss | +| skip_header_row_number | long | 否 | 0 | +| schema | config | 否 | - | +| sheet_name | string | 否 | - | +| xml_row_tag | string | 否 | - | +| xml_use_attr_format | boolean | 否 | - | +| csv_use_header_line | boolean | 否 | false | +| file_filter_pattern | string | 否 | - | +| compress_codec | string | 否 | none | +| archive_compress_codec | string | 否 | none | +| encoding | string | 否 | UTF-8 | +| null_format | string | 否 | - | +| common-options | | 否 | - | ### host [string] @@ -313,6 +314,11 @@ SeaTunnel 将从源文件中跳过前 2 行。 指定是否使用标签属性格式处理数据。 +### csv_use_header_line [boolean] + +仅在文件格式为 csv 时可以选择配置。 +是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配 + ### compress_codec [string] 文件的压缩编解码器,支持的详细信息如下: diff --git a/docs/zh/connector-v2/source/HdfsFile.md b/docs/zh/connector-v2/source/HdfsFile.md index cf9c7c3687..4fa6993748 100644 --- a/docs/zh/connector-v2/source/HdfsFile.md +++ b/docs/zh/connector-v2/source/HdfsFile.md @@ -64,6 +64,8 @@ import ChangeLog from '../changelog/connector-file-hadoop.md'; | sheet_name | string | 否 | - | 读取工作簿的表格,仅在文件格式为 excel 时使用。 | | compress_codec | string | 否 | none | 文件的压缩编解码器。 | | common-options | | 否 | - | 源插件通用参数,请参阅 [源通用选项](../../../en/connector-v2/source-common-options.md) 获取详细信息。 | +| csv_use_header_line | boolean | 否 | false | 是否使用标题行来解析文件,仅当 file_format 为 `csv` 且文件包含与 RFC 4180 匹配的标题行时使用 | + ### delimiter/field_delimiter [string] diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java index 3474538f74..b656f555e0 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java @@ -129,6 +129,13 @@ public class BaseSourceConfigOptions { .defaultValue(0L) .withDescription("The number of rows to skip"); + public static final Option<Boolean> CSV_USE_HEADER_LINE = + Options.key("csv_use_header_line") + .booleanType() + .defaultValue(Boolean.FALSE) + .withDescription( + "whether to use the header line to parse the file, only used when the file_format is csv"); + public static final Option<List<String>> READ_PARTITIONS = Options.key("read_partitions") .listType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java index cc5cb8820e..4cc7fb2628 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java @@ -51,8 +51,10 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; @Slf4j public class CsvReadStrategy extends AbstractReadStrategy { @@ -67,6 +69,7 @@ public class CsvReadStrategy extends AbstractReadStrategy { private int[] indexes; private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue(); private CatalogTable inputCatalogTable; + private boolean firstLineAsHeader = BaseSourceConfigOptions.CSV_USE_HEADER_LINE.defaultValue(); @Override public void read(String path, String tableId, Collector<SeaTunnelRow> output) @@ -102,9 +105,19 @@ public class CsvReadStrategy extends AbstractReadStrategy { } CSVFormat csvFormat = CSVFormat.DEFAULT; + if (firstLineAsHeader) { + csvFormat = csvFormat.withFirstRecordAsHeader(); + } try (BufferedReader reader = new BufferedReader(new InputStreamReader(actualInputStream, encoding)); CSVParser csvParser = new CSVParser(reader, csvFormat); ) { + // test and skip `\uFEFF` BOM + reader.mark(1); + int firstChar = reader.read(); + if (firstChar != 0xFEFF) { + reader.reset(); + } + // skip lines for (int i = 0; i < skipHeaderNumber; i++) { if (reader.readLine() == null) { throw new IOException( @@ -114,10 +127,18 @@ public class CsvReadStrategy extends AbstractReadStrategy { } } // read lines + List<String> headers = getHeaders(csvParser); for (CSVRecord csvRecord : csvParser) { HashMap<Integer, String> fieldIdValueMap = new HashMap<>(); - for (int i = 0; i < inputCatalogTable.getTableSchema().getColumns().size(); i++) { - fieldIdValueMap.put(i, csvRecord.get(i)); + for (int i = 0; i < headers.size(); i++) { + // the user input schema may not contain all the columns in the csv header + // and may contain columns in a different order with the csv header + int index = + inputCatalogTable.getSeaTunnelRowType().indexOf(headers.get(i), false); + if (index == -1) { + continue; + } + fieldIdValueMap.put(index, csvRecord.get(i)); } SeaTunnelRow seaTunnelRow = deserializationSchema.getSeaTunnelRow(fieldIdValueMap); if (!readColumns.isEmpty()) { @@ -152,6 +173,19 @@ public class CsvReadStrategy extends AbstractReadStrategy { } } + private List<String> getHeaders(CSVParser csvParser) { + List<String> headers; + if (firstLineAsHeader) { + headers = csvParser.getHeaderNames().stream().collect(Collectors.toList()); + } else { + headers = + inputCatalogTable.getTableSchema().getColumns().stream() + .map(column -> column.getName()) + .collect(Collectors.toList()); + } + return headers; + } + @Override public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) { this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema(); @@ -205,6 +239,10 @@ public class CsvReadStrategy extends AbstractReadStrategy { readonlyConfig .getOptional(BaseSourceConfigOptions.NULL_FORMAT) .orElse(null)); + if (pluginConfig.hasPath(BaseSourceConfigOptions.CSV_USE_HEADER_LINE.key())) { + firstLineAsHeader = + pluginConfig.getBoolean(BaseSourceConfigOptions.CSV_USE_HEADER_LINE.key()); + } if (isMergePartition) { deserializationSchema = builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index fc69268584..63b94d82f6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -290,6 +290,14 @@ public class LocalFileIT extends TestSuiteBase { "/csv/break_line.csv", "/seatunnel/read/csv/break_line/break_line.csv", container); + ContainerUtil.copyFileIntoContainers( + "/csv/csv_with_header1.csv", + "/seatunnel/read/csv/header/csv_with_header1.csv", + container); + ContainerUtil.copyFileIntoContainers( + "/csv/csv_with_header2.csv", + "/seatunnel/read/csv/header/csv_with_header2.csv", + container); ContainerUtil.copyFileIntoContainers( "/text/e2e_null_format.txt", @@ -305,6 +313,7 @@ public class LocalFileIT extends TestSuiteBase { TestHelper helper = new TestHelper(container); helper.execute("/csv/fake_to_local_csv.conf"); helper.execute("/csv/local_csv_to_assert.conf"); + helper.execute("/csv/csv_with_header_to_assert.conf"); helper.execute("/csv/breakline_csv_to_assert.conf"); helper.execute("/excel/fake_to_local_excel.conf"); helper.execute("/excel/local_excel_to_assert.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv new file mode 100644 index 0000000000..25b892a001 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv @@ -0,0 +1,2 @@ +name,id,is_female +tom,20,true \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv new file mode 100644 index 0000000000..63c1d91e60 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv @@ -0,0 +1,2 @@ +name,is_female,id +tommy,false,30 \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf new file mode 100644 index 0000000000..b0aaa6d6b2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/seatunnel/read/csv/header" + file_format_type = csv + csv_use_header_line = true + schema = { + fields { + id = int + name = string + is_female = boolean + } + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 2 + } + { + rule_type = MIN_ROW + rule_value = 2 + } + ] + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + { + field_name = is_female + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +}