This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 227a11f5aa [Fix][Connector-V2] User selects csv string pattern (#8572) 227a11f5aa is described below commit 227a11f5aa3b85a0273443d141be08d8c9cf18b9 Author: corgy-w <73771213+corg...@users.noreply.github.com> AuthorDate: Thu Feb 6 18:40:16 2025 +0800 [Fix][Connector-V2] User selects csv string pattern (#8572) --- docs/en/connector-v2/sink/CosFile.md | 11 +++- docs/en/connector-v2/sink/FtpFile.md | 14 +++++ docs/en/connector-v2/sink/HdfsFile.md | 9 ++++ docs/en/connector-v2/sink/LocalFile.md | 33 +++++++----- docs/en/connector-v2/sink/OssFile.md | 32 ++++++++---- docs/en/connector-v2/sink/OssJindoFile.md | 9 ++++ docs/en/connector-v2/sink/S3File.md | 43 ++++++---------- docs/en/connector-v2/sink/SftpFile.md | 25 +++++++-- docs/zh/connector-v2/sink/HdfsFile.md | 12 ++++- docs/zh/connector-v2/sink/LocalFile.md | 9 ++++ .../seatunnel/file/config/BaseSinkConfig.java | 7 +++ .../seatunnel/file/sink/config/FileSinkConfig.java | 14 +++++ .../file/sink/writer/CsvWriteStrategy.java | 4 ++ .../format/csv/CsvSerializationSchema.java | 40 ++++++++++++--- .../format/csv/constant/CsvStringQuoteMode.java | 38 ++++++++++++++ .../format/csv/CsvTextFormatSchemaTest.java | 59 +++++++++++++++++----- 16 files changed, 281 insertions(+), 78 deletions(-) diff --git a/docs/en/connector-v2/sink/CosFile.md b/docs/en/connector-v2/sink/CosFile.md index 2441306566..5b4adef262 100644 --- a/docs/en/connector-v2/sink/CosFile.md +++ b/docs/en/connector-v2/sink/CosFile.md @@ -59,6 +59,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | +| csv_string_quote_mode | enum | no | MINIMAL | Only used when file_format is csv. | | xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | | xml_row_tag | string | no | RECORD | Only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | @@ -107,7 +108,7 @@ Only used when `custom_filename` is `true` When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows: -| Symbol | Description | +| Symbol | Description | |--------|--------------------| | y | Year | | M | Month | @@ -199,6 +200,14 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### csv_string_quote_mode [string] + +When File Format is CSV,The string quote mode of CSV. + +- ALL: All String fields will be quoted. +- MINIMAL: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string. +- NONE: Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the escape character. If the escape character is not set, format validation throws an exception. + ### xml_root_tag [string] Specifies the tag name of the root element within the XML file. diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index 47811bdd79..5bc3c33668 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -58,6 +58,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| csv_string_quote_mode | enum | no | MINIMAL | Only used when file_format is csv. | | xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | | xml_row_tag | string | no | RECORD | Only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | @@ -207,6 +208,14 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### csv_string_quote_mode [string] + +When File Format is CSV,The string quote mode of CSV. + +- ALL: All String fields will be quoted. +- MINIMAL: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string. +- NONE: Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the escape character. If the escape character is not set, format validation throws an exception. + ### xml_root_tag [string] Specifies the tag name of the root element within the XML file. @@ -237,17 +246,22 @@ Only used when file_format_type is json,text,csv,xml. The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`. ### schema_save_mode [string] + Existing dir processing method. + - RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist - CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist - ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist - IGNORE :Ignore the treatment of the table ### data_save_mode [string] + Existing data processing method. + - DROP_DATA: preserve dir and delete data files - APPEND_DATA: preserve dir, preserve data files - ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported + ## Example For text file format simple config diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index ae9479aa8f..48275e3297 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -65,6 +65,7 @@ Output data to hdfs file | common-options | object | no | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details [...] | max_rows_in_memory | int | no | - | Only used when file_format is excel.When File Format is Excel,The maximum number of data items that can be cached in the memory. [...] | sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel.Writer the sheet of the workbook [...] +| csv_string_quote_mode | enum | no | MINIMAL | Only used when file_format is csv. [...] | xml_root_tag | string | no | RECORDS | Only used when file_format is xml, specifies the tag name of the root element within the XML file. [...] | xml_row_tag | string | no | RECORD | Only used when file_format is xml, specifies the tag name of the data rows within the XML file [...] | xml_use_attr_format | boolean | no | - | Only used when file_format is xml, specifies Whether to process data using the tag attribute format. [...] @@ -203,6 +204,14 @@ HdfsFile { Only used when file_format_type is text,csv.false:don't write header,true:write header. +### csv_string_quote_mode [string] + +When File Format is CSV,The string quote mode of CSV. + +- ALL: All String fields will be quoted. +- MINIMAL: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string. +- NONE: Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the escape character. If the escape character is not set, format validation throws an exception. + ### For compress simple config ``` diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 9c2141b61f..6cca6c0479 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -33,18 +33,18 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| Name | Type | Required | Default | Description | -|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | +| Name | Type | Required | Default | Description | +|---------------------------------------|---------|----------|------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | | partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | | is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | @@ -54,6 +54,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| csv_string_quote_mode | enum | no | MINIMAL | Only used when file_format is csv. | | xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | | xml_row_tag | string | no | RECORD | Only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | @@ -181,6 +182,14 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### csv_string_quote_mode [string] + +When File Format is CSV,The string quote mode of CSV. + +- ALL: All String fields will be quoted. +- MINIMAL: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string. +- NONE: Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the escape character. If the escape character is not set, format validation throws an exception. + ### xml_root_tag [string] Specifies the tag name of the root element within the XML file. diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md index 55ef4f0935..8ef6cd3efd 100644 --- a/docs/en/connector-v2/sink/OssFile.md +++ b/docs/en/connector-v2/sink/OssFile.md @@ -42,7 +42,7 @@ If write to `csv`, `text` file type, All column will be string. ### Orc File Type -| SeaTunnel Data Type | Orc Data Type | +| SeaTunnel Data Type | Orc Data Type | |----------------------|-----------------------| | STRING | STRING | | BOOLEAN | BOOLEAN | @@ -64,7 +64,7 @@ If write to `csv`, `text` file type, All column will be string. ### Parquet File Type -| SeaTunnel Data Type | Parquet Data Type | +| SeaTunnel Data Type | Parquet Data Type | |----------------------|-----------------------| | STRING | STRING | | BOOLEAN | BOOLEAN | @@ -111,6 +111,7 @@ If write to `csv`, `text` file type, All column will be string. | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| csv_string_quote_mode | enum | no | MINIMAL | Only used when file_format is csv. | | xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | | xml_row_tag | string | no | RECORD | Only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | @@ -252,6 +253,14 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### csv_string_quote_mode [string] + +When File Format is CSV,The string quote mode of CSV. + +- ALL: All String fields will be quoted. +- MINIMAL: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string. +- NONE: Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the escape character. If the escape character is not set, format validation throws an exception. + ### xml_root_tag [string] Specifies the tag name of the root element within the XML file. @@ -279,7 +288,8 @@ The encoding of the file to write. This param will be parsed by `Charset.forName ## How to Create an Oss Data Synchronization Jobs -The following example demonstrates how to create a data synchronization job that reads data from Fake Source and writes it to the Oss: +The following example demonstrates how to create a data synchronization job that reads data from Fake Source and writes +it to the Oss: For text file format with `have_partition` and `custom_filename` and `sink_columns` @@ -398,13 +408,15 @@ sink { } } ``` + ### enable_header_write [boolean] Only used when file_format_type is text,csv.false:don't write header,true:write header. ### Multiple Table -For extract source metadata from upstream, you can use `${database_name}`, `${table_name}` and `${schema_name}` in the path. +For extract source metadata from upstream, you can use `${database_name}`, `${table_name}` and `${schema_name}` in the +path. ```bash @@ -528,14 +540,16 @@ sink { - [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/seatunnel/pull/2980)) - [BugFix] Fix filesystem get error ([3117](https://github.com/apache/seatunnel/pull/3117)) -- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/seatunnel/pull/3083)) +- [BugFix] Solved the bug of can not parse '\t' as delimiter from config + file ([3083](https://github.com/apache/seatunnel/pull/3083)) ### Next version -- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/seatunnel/pull/3258)) - - When field from upstream is null it will throw NullPointerException - - Sink columns mapping failed - - When restore writer from states getting transaction directly failed +- [BugFix] Fixed the following bugs that failed to write data to + files ([3258](https://github.com/apache/seatunnel/pull/3258)) + - When field from upstream is null it will throw NullPointerException + - Sink columns mapping failed + - When restore writer from states getting transaction directly failed - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/seatunnel/pull/3625)) - [Improve] Support file compress ([3899](https://github.com/apache/seatunnel/pull/3899)) diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md index 21fe05359e..5507b8664e 100644 --- a/docs/en/connector-v2/sink/OssJindoFile.md +++ b/docs/en/connector-v2/sink/OssJindoFile.md @@ -63,6 +63,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| csv_string_quote_mode | enum | no | MINIMAL | Only used when file_format is csv. | | xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | | xml_row_tag | string | no | RECORD | Only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | @@ -203,6 +204,14 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### csv_string_quote_mode [string] + +When File Format is CSV,The string quote mode of CSV. + +- ALL: All String fields will be quoted. +- MINIMAL: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string. +- NONE: Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the escape character. If the escape character is not set, format validation throws an exception. + ### xml_root_tag [string] Specifies the tag name of the root element within the XML file. diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index b6fbc4ef4e..7c82bb3659 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -49,7 +49,7 @@ If write to `csv`, `text` file type, All column will be string. ### Orc File Type -| SeaTunnel Data type | Orc Data type | +| SeaTunnel Data type | Orc Data type | |----------------------|-----------------------| | STRING | STRING | | BOOLEAN | BOOLEAN | @@ -71,7 +71,7 @@ If write to `csv`, `text` file type, All column will be string. ### Parquet File Type -| SeaTunnel Data type | Parquet Data type | +| SeaTunnel Data type | Parquet Data type | |----------------------|-----------------------| | STRING | STRING | | BOOLEAN | BOOLEAN | @@ -119,6 +119,7 @@ If write to `csv`, `text` file type, All column will be string. | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format is excel. | +| csv_string_quote_mode | enum | no | MINIMAL | Only used when file_format is csv. | | xml_root_tag | string | no | RECORDS | Only used when file_format is xml, specifies the tag name of the root element within the XML file. | | xml_row_tag | string | no | RECORD | Only used when file_format is xml, specifies the tag name of the data rows within the XML file | | xml_use_attr_format | boolean | no | - | Only used when file_format is xml, specifies Whether to process data using the tag attribute format. | @@ -258,6 +259,14 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### csv_string_quote_mode [string] + +When File Format is CSV,The string quote mode of CSV. + +- ALL: All String fields will be quoted. +- MINIMAL: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string. +- NONE: Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the escape character. If the escape character is not set, format validation throws an exception. + ### xml_root_tag [string] Specifies the tag name of the root element within the XML file. @@ -378,7 +387,8 @@ sink { } ``` -For text file format with `have_partition` and `custom_filename` and `sink_columns` and `com.amazonaws.auth.InstanceProfileCredentialsProvider` +For text file format with `have_partition` and `custom_filename` and `sink_columns` +and `com.amazonaws.auth.InstanceProfileCredentialsProvider` ```hocon @@ -491,30 +501,7 @@ sink { } } ``` -### enable_header_write [boolean] - -Only used when file_format_type is text,csv.false:don't write header,true:write header. - -## Changelog -### 2.3.0-beta 2022-10-20 - -- Add S3File Sink Connector - -### 2.3.0 2022-12-30 - -- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/seatunnel/pull/3258)) - - When field from upstream is null it will throw NullPointerException - - Sink columns mapping failed - - When restore writer from states getting transaction directly failed -- [Feature] Support S3A protocol ([3632](https://github.com/apache/seatunnel/pull/3632)) - - Allow user to add additional hadoop-s3 parameters - - Allow the use of the s3a protocol - - Decouple hadoop-aws dependencies -- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/seatunnel/pull/3625)) -- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/seatunnel/pull/)) - -### Next version - -- [Improve] Support file compress ([3899](https://github.com/apache/seatunnel/pull/3899)) +### enable_header_write [boolean] +Only used when file_format_type is text,csv.false:don't write header,true:write header. \ No newline at end of file diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md index 4cde1eb866..612d8c36d5 100644 --- a/docs/en/connector-v2/sink/SftpFile.md +++ b/docs/en/connector-v2/sink/SftpFile.md @@ -57,6 +57,7 @@ By default, we use 2PC commit to ensure `exactly-once` | common-options | object | no | - | | | max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | | sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| csv_string_quote_mode | enum | no | MINIMAL | Only used when file_format is csv. | | xml_root_tag | string | no | RECORDS | Only used when file_format is xml. | | xml_row_tag | string | no | RECORD | Only used when file_format is xml. | | xml_use_attr_format | boolean | no | - | Only used when file_format is xml. | @@ -200,6 +201,14 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### csv_string_quote_mode [string] + +When File Format is CSV,The string quote mode of CSV. + +- ALL: All String fields will be quoted. +- MINIMAL: Quotes fields which contain special characters such as a the field delimiter, quote character or any of the characters in the line separator string. +- NONE: Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the escape character. If the escape character is not set, format validation throws an exception. + ### xml_root_tag [string] Specifies the tag name of the root element within the XML file. @@ -230,14 +239,18 @@ Only used when file_format_type is json,text,csv,xml. The encoding of the file to write. This param will be parsed by `Charset.forName(encoding)`. ### schema_save_mode [string] + Existing dir processing method. + - RECREATE_SCHEMA: will create when the dir does not exist, delete and recreate when the dir is exist - CREATE_SCHEMA_WHEN_NOT_EXIST: will create when the dir does not exist, skipped when the dir is exist - ERROR_WHEN_SCHEMA_NOT_EXIST: error will be reported when the dir does not exist - IGNORE :Ignore the treatment of the table ### data_save_mode [string] + Existing data processing method. + - DROP_DATA: preserve dir and delete data files - APPEND_DATA: preserve dir, preserve data files - ERROR_WHEN_DATA_EXISTS: when there is data files, an error is reported @@ -271,7 +284,8 @@ SftpFile { ``` -When our source end is multiple tables, and wants different expressions to different directory, we can configure this way +When our source end is multiple tables, and wants different expressions to different directory, we can configure this +way ```hocon SftpFile { @@ -305,10 +319,11 @@ SftpFile { ### 2.3.0 2022-12-30 - Add SftpFile Sink Connector -- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/seatunnel/pull/3258)) - - When field from upstream is null it will throw NullPointerException - - Sink columns mapping failed - - When restore writer from states getting transaction directly failed +- [BugFix] Fixed the following bugs that failed to write data to + files ([3258](https://github.com/apache/seatunnel/pull/3258)) + - When field from upstream is null it will throw NullPointerException + - Sink columns mapping failed + - When restore writer from states getting transaction directly failed - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/seatunnel/pull/3625)) ### Next version diff --git a/docs/zh/connector-v2/sink/HdfsFile.md b/docs/zh/connector-v2/sink/HdfsFile.md index 4561eb1572..939ad4adb2 100644 --- a/docs/zh/connector-v2/sink/HdfsFile.md +++ b/docs/zh/connector-v2/sink/HdfsFile.md @@ -30,7 +30,7 @@ ## 支持的数据源信息 -| 数据源 | 支持的版本 | +| 数据源 | 支持的版本 | |--------|------------------| | Hdfs文件 | hadoop 2.x 和 3.x | @@ -63,6 +63,7 @@ | kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径 | | compress_codec | string | 否 | none | 压缩编解码器 | | common-options | object | 否 | - | 接收器插件通用参数,请参阅 [接收器通用选项](../sink-common-options.md) 了解详情 | +| csv_string_quote_mode | enum | 否 | MINIMAL | 仅在文件格式为 CSV 时使用。 | | enable_header_write | boolean | 否 | false | 仅在 file_format_type 为 text,csv 时使用。<br/> false:不写入表头,true:写入表头。 | | max_rows_in_memory | int | 否 | - | 仅当 file_format 为 excel 时使用。当文件格式为 Excel 时,可以缓存在内存中的最大数据项数。 | | sheet_name | string | 否 | Sheet${Random number} | 仅当 file_format 为 excel 时使用。将工作簿的表写入指定的表名 | @@ -181,10 +182,19 @@ HdfsFile { is_enable_transaction = true } ``` + ### enable_header_write [boolean] 仅在 file_format_type 为 text,csv 时使用。false:不写入表头,true:写入表头。 +### csv_string_quote_mode [string] + +当文件格式为 CSV 时,CSV 的字符串引号模式。 + +- ALL:所有字符串字段都会加引号。 +- MINIMAL:仅为包含特殊字符(如字段分隔符、引号字符或行分隔符字符串中的任何字符)的字段加引号。 +- NONE:从不为字段加引号。当数据中包含分隔符时,输出会在前面加上转义字符。如果未设置转义字符,则格式验证会抛出异常。 + ### kerberos 的简单配置 ``` diff --git a/docs/zh/connector-v2/sink/LocalFile.md b/docs/zh/connector-v2/sink/LocalFile.md index 13cfd3cfbf..fe6008b10d 100644 --- a/docs/zh/connector-v2/sink/LocalFile.md +++ b/docs/zh/connector-v2/sink/LocalFile.md @@ -55,6 +55,7 @@ | common-options | object | 否 | - | 常见选项 | | max_rows_in_memory | int | 否 | - | 仅在 file_format_type 为 excel 时使用 | | sheet_name | string | 否 | Sheet${随机数} | 仅在 file_format_type 为 excel 时使用 | +| csv_string_quote_mode | enum | 否 | MINIMAL | 仅在文件格式为 CSV 时使用。 | | xml_root_tag | string | 否 | RECORDS | 仅在 file_format 为 xml 时使用 | | xml_row_tag | string | 否 | RECORD | 仅在 file_format 为 xml 时使用 | | xml_use_attr_format | boolean | 否 | - | 仅在 file_format 为 xml 时使用 | @@ -176,6 +177,14 @@ Sink 插件的常见参数,请参阅 [Sink 常见选项](../sink-common-option 工作簿的表名。 +### csv_string_quote_mode [string] + +当文件格式为 CSV 时,CSV 的字符串引号模式。 + +- ALL:所有字符串字段都会加引号。 +- MINIMAL:仅为包含特殊字符(如字段分隔符、引号字符或行分隔符字符串中的任何字符)的字段加引号。 +- NONE:从不为字段加引号。当数据中包含分隔符时,输出会在前面加上转义字符。如果未设置转义字符,则格式验证会抛出异常。 + ### xml _root_tag [string] diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index d2d3c4d0cd..cc8f99644a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SchemaSaveMode; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; +import org.apache.seatunnel.format.csv.constant.CsvStringQuoteMode; import org.apache.seatunnel.format.text.constant.TextFormatConstant; import java.util.Arrays; @@ -318,4 +319,10 @@ public class BaseSinkConfig extends KerberosConfig { .defaultValue(APPEND_DATA) .withDescription( "Before the synchronization task begins, different processing of data files that already exist in the directory"); + + public static final Option<CsvStringQuoteMode> CSV_STRING_QUOTE_MODE = + Options.key("csv_string_quote_mode") + .enumType(CsvStringQuoteMode.class) + .defaultValue(CsvStringQuoteMode.MINIMAL) + .withDescription("CSV file string quote mode, only valid for csv files"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java index d9315a6f5c..c2e42549b6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.PartitionConfig; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.format.csv.constant.CsvStringQuoteMode; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -86,6 +87,9 @@ public class FileSinkConfig extends BaseFileSinkConfig implements PartitionConfi private List<String> parquetAvroWriteFixedAsInt96 = BaseSinkConfig.PARQUET_AVRO_WRITE_FIXED_AS_INT96.defaultValue(); + private CsvStringQuoteMode csvStringQuoteMode = + BaseSinkConfig.CSV_STRING_QUOTE_MODE.defaultValue(); + public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunnelRowTypeInfo) { super(config); checkArgument( @@ -239,5 +243,15 @@ public class FileSinkConfig extends BaseFileSinkConfig implements PartitionConfi BaseSinkConfig.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key()); } } + + if (FileFormat.CSV + .name() + .equalsIgnoreCase(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) { + if (config.hasPath(BaseSinkConfig.CSV_STRING_QUOTE_MODE.key())) { + this.csvStringQuoteMode = + CsvStringQuoteMode.valueOf( + config.getString(BaseSinkConfig.CSV_STRING_QUOTE_MODE.key())); + } + } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/CsvWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/CsvWriteStrategy.java index be5536eb9b..0da0c7ef9a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/CsvWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/CsvWriteStrategy.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.csv.CsvSerializationSchema; +import org.apache.seatunnel.format.csv.constant.CsvStringQuoteMode; import org.apache.hadoop.fs.FSDataOutputStream; @@ -54,10 +55,12 @@ public class CsvWriteStrategy extends AbstractWriteStrategy<FSDataOutputStream> private final FileFormat fileFormat; private final Boolean enableHeaderWriter; private final Charset charset; + private final CsvStringQuoteMode csvStringQuoteMode; private SerializationSchema serializationSchema; public CsvWriteStrategy(FileSinkConfig fileSinkConfig) { super(fileSinkConfig); + this.csvStringQuoteMode = fileSinkConfig.getCsvStringQuoteMode(); this.beingWrittenOutputStream = new LinkedHashMap<>(); this.isFirstWrite = new HashMap<>(); this.fieldDelimiter = fileSinkConfig.getFieldDelimiter(); @@ -83,6 +86,7 @@ public class CsvWriteStrategy extends AbstractWriteStrategy<FSDataOutputStream> .dateTimeFormatter(dateTimeFormat) .timeFormatter(timeFormat) .charset(charset) + .quoteMode(csvStringQuoteMode) .build(); } diff --git a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvSerializationSchema.java b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvSerializationSchema.java index 7bdbcc6488..e539f9d932 100644 --- a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvSerializationSchema.java +++ b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvSerializationSchema.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; import org.apache.seatunnel.format.csv.constant.CsvFormatConstant; +import org.apache.seatunnel.format.csv.constant.CsvStringQuoteMode; import org.apache.seatunnel.format.csv.exception.SeaTunnelCsvFormatException; import org.apache.commons.csv.CSVFormat; @@ -55,6 +56,7 @@ public class CsvSerializationSchema implements SerializationSchema { private final TimeUtils.Formatter timeFormatter; private final Charset charset; private final String nullValue; + private final CsvStringQuoteMode quoteMode; private CsvSerializationSchema( @NonNull SeaTunnelRowType seaTunnelRowType, @@ -63,7 +65,8 @@ public class CsvSerializationSchema implements SerializationSchema { DateTimeUtils.Formatter dateTimeFormatter, TimeUtils.Formatter timeFormatter, Charset charset, - String nullValue) { + String nullValue, + CsvStringQuoteMode quoteMode) { this.seaTunnelRowType = seaTunnelRowType; this.separators = separators; this.dateFormatter = dateFormatter; @@ -71,6 +74,7 @@ public class CsvSerializationSchema implements SerializationSchema { this.timeFormatter = timeFormatter; this.charset = charset; this.nullValue = nullValue; + this.quoteMode = quoteMode; } public static Builder builder() { @@ -86,6 +90,7 @@ public class CsvSerializationSchema implements SerializationSchema { private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS; private Charset charset = StandardCharsets.UTF_8; private String nullValue = ""; + private CsvStringQuoteMode quoteMode = CsvStringQuoteMode.MINIMAL; private Builder() {} @@ -129,6 +134,11 @@ public class CsvSerializationSchema implements SerializationSchema { return this; } + public Builder quoteMode(CsvStringQuoteMode quoteMode) { + this.quoteMode = quoteMode; + return this; + } + public CsvSerializationSchema build() { return new CsvSerializationSchema( seaTunnelRowType, @@ -137,7 +147,8 @@ public class CsvSerializationSchema implements SerializationSchema { dateTimeFormatter, timeFormatter, charset, - nullValue); + nullValue, + quoteMode); } } @@ -225,12 +236,25 @@ public class CsvSerializationSchema implements SerializationSchema { } private String addQuotesUsingCSVFormat(String fieldValue) { - CSVFormat format = - CSVFormat.DEFAULT - .builder() - .setQuoteMode(QuoteMode.MINIMAL) - .setRecordSeparator("") - .build(); + CSVFormat.Builder builder = CSVFormat.DEFAULT.builder().setRecordSeparator(""); + switch (quoteMode) { + case ALL: + builder.setQuoteMode(QuoteMode.ALL); + break; + case MINIMAL: + builder.setQuoteMode(QuoteMode.MINIMAL); + break; + case NONE: + builder.setQuoteMode(QuoteMode.NONE); + break; + default: + throw new SeaTunnelCsvFormatException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + String.format( + "SeaTunnel format csv not supported for parsing this type [%s]", + quoteMode)); + } + CSVFormat format = builder.build(); StringWriter stringWriter = new StringWriter(); try (CSVPrinter printer = new CSVPrinter(stringWriter, format)) { printer.printRecord(fieldValue); diff --git a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/constant/CsvStringQuoteMode.java b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/constant/CsvStringQuoteMode.java new file mode 100644 index 0000000000..6c2c9ec377 --- /dev/null +++ b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/constant/CsvStringQuoteMode.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.csv.constant; + +import java.io.Serializable; + +/** @see org.apache.commons.csv.QuoteMode */ +public enum CsvStringQuoteMode implements Serializable { + /** Quotes all fields. */ + ALL, + + /** + * Quotes fields which contain special characters such as a the field delimiter, quote character + * or any of the characters in the line separator string. + */ + MINIMAL, + + /** + * Never quotes fields. When the delimiter occurs in data, the printer prefixes it with the + * escape character. If the escape character is not set, format validation throws an exception. + */ + NONE +} diff --git a/seatunnel-formats/seatunnel-format-csv/src/test/java/org/apache/seatunnel/format/csv/CsvTextFormatSchemaTest.java b/seatunnel-formats/seatunnel-format-csv/src/test/java/org/apache/seatunnel/format/csv/CsvTextFormatSchemaTest.java index a20a56434b..3bcd9a53c5 100644 --- a/seatunnel-formats/seatunnel-format-csv/src/test/java/org/apache/seatunnel/format/csv/CsvTextFormatSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-csv/src/test/java/org/apache/seatunnel/format/csv/CsvTextFormatSchemaTest.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.utils.DateTimeUtils.Formatter; +import org.apache.seatunnel.format.csv.constant.CsvStringQuoteMode; import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor; import org.junit.jupiter.api.Assertions; @@ -46,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class CsvTextFormatSchemaTest { public String content = "\"mess,age\"," + + "\"message\"," + "true," + "1," + "2," @@ -88,7 +90,8 @@ public class CsvTextFormatSchemaTest { seaTunnelRowType = new SeaTunnelRowType( new String[] { - "string_field", + "string_field1", + "string_field2", "boolean_field", "tinyint_field", "smallint_field", @@ -106,6 +109,7 @@ public class CsvTextFormatSchemaTest { "map_field" }, new SeaTunnelDataType<?>[] { + BasicType.STRING_TYPE, BasicType.STRING_TYPE, BasicType.BOOLEAN_TYPE, BasicType.BYTE_TYPE, @@ -146,26 +150,53 @@ public class CsvTextFormatSchemaTest { .seaTunnelRowType(seaTunnelRowType) .dateTimeFormatter(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS) .delimiter(",") + .quoteMode(CsvStringQuoteMode.MINIMAL) + .build(); + + CsvSerializationSchema csvSerializationSchemaWithAllQuotes = + CsvSerializationSchema.builder() + .seaTunnelRowType(seaTunnelRowType) + .dateTimeFormatter(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS) + .delimiter(",") + .quoteMode(CsvStringQuoteMode.ALL) + .build(); + + CsvSerializationSchema csvSerializationSchemaWithNoneQuotes = + CsvSerializationSchema.builder() + .seaTunnelRowType(seaTunnelRowType) + .dateTimeFormatter(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSS) + .delimiter(",") + .quoteMode(CsvStringQuoteMode.NONE) .build(); SeaTunnelRow seaTunnelRow = deserializationSchema.deserialize(content.getBytes()); Assertions.assertEquals("mess,age", seaTunnelRow.getField(0)); - Assertions.assertEquals(Boolean.TRUE, seaTunnelRow.getField(1)); - Assertions.assertEquals(Byte.valueOf("1"), seaTunnelRow.getField(2)); - Assertions.assertEquals(Short.valueOf("2"), seaTunnelRow.getField(3)); - Assertions.assertEquals(Integer.valueOf("3"), seaTunnelRow.getField(4)); - Assertions.assertEquals(Long.valueOf("4"), seaTunnelRow.getField(5)); - Assertions.assertEquals(Float.valueOf("6.66"), seaTunnelRow.getField(6)); - Assertions.assertEquals(Double.valueOf("7.77"), seaTunnelRow.getField(7)); - Assertions.assertEquals(BigDecimal.valueOf(8.8888888D), seaTunnelRow.getField(8)); - Assertions.assertNull((seaTunnelRow.getField(9))); - Assertions.assertEquals(LocalDate.of(2022, 9, 24), seaTunnelRow.getField(10)); - Assertions.assertEquals(((Map<?, ?>) (seaTunnelRow.getField(15))).get("tyrantlucifer"), 18); - Assertions.assertEquals(((Map<?, ?>) (seaTunnelRow.getField(15))).get("Kris"), 21); + Assertions.assertEquals(Boolean.TRUE, seaTunnelRow.getField(2)); + Assertions.assertEquals(Byte.valueOf("1"), seaTunnelRow.getField(3)); + Assertions.assertEquals(Short.valueOf("2"), seaTunnelRow.getField(4)); + Assertions.assertEquals(Integer.valueOf("3"), seaTunnelRow.getField(5)); + Assertions.assertEquals(Long.valueOf("4"), seaTunnelRow.getField(6)); + Assertions.assertEquals(Float.valueOf("6.66"), seaTunnelRow.getField(7)); + Assertions.assertEquals(Double.valueOf("7.77"), seaTunnelRow.getField(8)); + Assertions.assertEquals(BigDecimal.valueOf(8.8888888D), seaTunnelRow.getField(9)); + Assertions.assertNull((seaTunnelRow.getField(10))); + Assertions.assertEquals(LocalDate.of(2022, 9, 24), seaTunnelRow.getField(11)); + Assertions.assertEquals(((Map<?, ?>) (seaTunnelRow.getField(16))).get("tyrantlucifer"), 18); + Assertions.assertEquals(((Map<?, ?>) (seaTunnelRow.getField(16))).get("Kris"), 21); byte[] serialize = csvSerializationSchema.serialize(seaTunnelRow); Assertions.assertEquals( - "\"mess,age\",true,1,2,3,4,6.66,7.77,8.8888888,,2022-09-24,22:45:00,2022-09-24 22:45:00.000000,1\u00032\u00033\u00034\u00035\u00036\u0002tyrantlucifer\u000418\u0003Kris\u000421,1\u00022\u00023\u00024\u00025\u00026,tyrantlucifer\u000318\u0002Kris\u000321\u0002nullValueKey\u0003\u0002\u00031231", + "\"mess,age\",message,true,1,2,3,4,6.66,7.77,8.8888888,,2022-09-24,22:45:00,2022-09-24 22:45:00.000000,1\u00032\u00033\u00034\u00035\u00036\u0002tyrantlucifer\u000418\u0003Kris\u000421,1\u00022\u00023\u00024\u00025\u00026,tyrantlucifer\u000318\u0002Kris\u000321\u0002nullValueKey\u0003\u0002\u00031231", new String(serialize)); + + byte[] serialize1 = csvSerializationSchemaWithAllQuotes.serialize(seaTunnelRow); + Assertions.assertEquals( + "\"mess,age\",\"message\",true,1,2,3,4,6.66,7.77,8.8888888,,2022-09-24,22:45:00,2022-09-24 22:45:00.000000,1\u00032\u00033\u00034\u00035\u00036\u0002tyrantlucifer\u000418\u0003Kris\u000421,1\u00022\u00023\u00024\u00025\u00026,tyrantlucifer\u000318\u0002Kris\u000321\u0002nullValueKey\u0003\u0002\u00031231", + new String(serialize1)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> { + csvSerializationSchemaWithNoneQuotes.serialize(seaTunnelRow); + }); } @Test