This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 74db1cbaac [Feature][File] Support extract CSV files with different 
columns in different order (#9064)
74db1cbaac is described below

commit 74db1cbaac8e1b913c7959d5bdc63a087aad967a
Author: litiliu <38579068+liti...@users.noreply.github.com>
AuthorDate: Fri Mar 28 22:09:11 2025 +0800

    [Feature][File] Support extract CSV files with different columns in 
different order (#9064)
---
 docs/en/connector-v2/source/CosFile.md             |  5 ++
 docs/en/connector-v2/source/FtpFile.md             |  5 ++
 docs/en/connector-v2/source/HdfsFile.md            |  1 +
 docs/en/connector-v2/source/LocalFile.md           |  5 ++
 docs/en/connector-v2/source/OssFile.md             |  2 +
 docs/en/connector-v2/source/OssJindoFile.md        |  1 +
 docs/en/connector-v2/source/S3File.md              |  2 +
 docs/en/connector-v2/source/SftpFile.md            |  1 +
 docs/zh/connector-v2/source/CosFile.md             | 54 +++++++-------
 docs/zh/connector-v2/source/FtpFile.md             | 58 ++++++++-------
 docs/zh/connector-v2/source/HdfsFile.md            |  2 +
 .../file/config/BaseSourceConfigOptions.java       |  7 ++
 .../file/source/reader/CsvReadStrategy.java        | 42 ++++++++++-
 .../e2e/connector/file/local/LocalFileIT.java      |  9 +++
 .../src/test/resources/csv/csv_with_header1.csv    |  2 +
 .../src/test/resources/csv/csv_with_header2.csv    |  2 +
 .../resources/csv/csv_with_header_to_assert.conf   | 82 ++++++++++++++++++++++
 17 files changed, 228 insertions(+), 52 deletions(-)

diff --git a/docs/en/connector-v2/source/CosFile.md 
b/docs/en/connector-v2/source/CosFile.md
index 6f1129adc4..92222e221f 100644
--- a/docs/en/connector-v2/source/CosFile.md
+++ b/docs/en/connector-v2/source/CosFile.md
@@ -66,6 +66,7 @@ To use this connector you need put 
hadoop-cos-{hadoop.version}-{version}.jar and
 | sheet_name                | string  | no       | -                   |
 | xml_row_tag               | string  | no       | -                   |
 | xml_use_attr_format       | boolean | no       | -                   |
+| csv_use_header_line       | boolean | no       | false               |
 | file_filter_pattern       | string  | no       | -                   |
 | filename_extension            | string  | no       | -                   |
 | compress_codec            | string  | no       | none                |
@@ -274,6 +275,10 @@ Only need to be configured when file_format is xml.
 
 Specifies Whether to process data using the tag attribute format.
 
+### csv_use_header_line [boolean]
+
+Whether to use the header line to parse the file, only used when the 
file_format is `csv` and the file contains the header line that match RFC 4180
+
 ### file_filter_pattern [string]
 
 Filter pattern, which used for filtering files.
diff --git a/docs/en/connector-v2/source/FtpFile.md 
b/docs/en/connector-v2/source/FtpFile.md
index de87cc9fda..f13b86fca6 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -60,6 +60,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | sheet_name                | string  | no       | -                   |
 | xml_row_tag               | string  | no       | -                   |
 | xml_use_attr_format       | boolean | no       | -                   |
+| csv_use_header_line       | boolean | no       | -                   |
 | file_filter_pattern       | string  | no       | -                   |
 | filename_extension        | string  | no       | -                   |
 | compress_codec            | string  | no       | none                |
@@ -317,6 +318,10 @@ Only need to be configured when file_format is xml.
 
 Specifies Whether to process data using the tag attribute format.
 
+### csv_use_header_line [boolean]
+
+Whether to use the header line to parse the file, only used when the 
file_format is `csv` and the file contains the header line that match RFC 4180
+
 ### compress_codec [string]
 
 The compress codec of files and the details that supported as the following 
shown:
diff --git a/docs/en/connector-v2/source/HdfsFile.md 
b/docs/en/connector-v2/source/HdfsFile.md
index 903398d829..c7cd5b8073 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -64,6 +64,7 @@ Read data from hdfs file system.
 | sheet_name                | string  | no       | -                   | 
Reader the sheet of the workbook,Only used when file_format is excel.           
                                                                                
                                                                                
                                                                                
              |
 | xml_row_tag               | string  | no       | -                   | 
Specifies the tag name of the data rows within the XML file, only used when 
file_format is xml.                                                             
                                                                                
                                                                                
                  |
 | xml_use_attr_format       | boolean | no       | -                   | 
Specifies whether to process data using the tag attribute format, only used 
when file_format is xml.                                                        
                                                                                
                                                                                
                  |
+| csv_use_header_line       | boolean | no       | false               | 
Whether to use the header line to parse the file, only used when the 
file_format is `csv` and the file contains the header line that match RFC 4180  
                                                                                
                                                                                
                         |
 | file_filter_pattern       | string  | no       |                     | 
Filter pattern, which used for filtering files.                                 
                                                                                
                                                                                
                                                                                
              |
 | filename_extension        | string  | no       | -                   | 
Filter filename extension, which used for filtering files with specific 
extension. Example: `csv` `.txt` `json` `.xml`.                                 
                                                                                
                                                                                
                      |
 | compress_codec            | string  | no       | none                | The 
compress codec of files                                                         
                                                                                
                                                                                
                                                                                
          |
diff --git a/docs/en/connector-v2/source/LocalFile.md 
b/docs/en/connector-v2/source/LocalFile.md
index c88dda0c9b..408cfbfff3 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -61,6 +61,7 @@ If you use SeaTunnel Engine, It automatically integrated the 
hadoop jar when you
 | excel_engine              | string  | no       | POI                         
         |                                             
 | xml_row_tag               | string  | no       | -                           
         |
 | xml_use_attr_format       | boolean | no       | -                           
         |
+| csv_use_header_line       | boolean | no       | false                       
         |
 | file_filter_pattern       | string  | no       | -                           
         |
 | filename_extension            | string  | no       | -                       
             |
 | compress_codec            | string  | no       | none                        
         |
@@ -265,6 +266,10 @@ Only need to be configured when file_format is xml.
 
 Specifies Whether to process data using the tag attribute format.
 
+### csv_use_header_line [boolean]
+
+Whether to use the header line to parse the file, only used when the 
file_format is `csv` and the file contains the header line that match RFC 4180
+
 ### file_filter_pattern [string]
 
 Filter pattern, which used for filtering files.
diff --git a/docs/en/connector-v2/source/OssFile.md 
b/docs/en/connector-v2/source/OssFile.md
index f6f061c1d6..bf19076c8c 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -194,10 +194,12 @@ If you assign file type to `parquet` `orc`, schema option 
not required, connecto
 | time_format               | string  | no       | HH:mm:ss            | Time 
type format, used to tell connector how to convert string to time, supported as 
the following formats:`HH:mm:ss` `HH:mm:ss.SSS`                                 
                                                                                
                                                                               |
 | filename_extension        | string  | no       | -                   | 
Filter filename extension, which used for filtering files with specific 
extension. Example: `csv` `.txt` `json` `.xml`.                                 
                                                                                
                                                                                
            |
 | skip_header_row_number    | long    | no       | 0                   | Skip 
the first few lines, but only for the txt and csv. For example, set like 
following:`skip_header_row_number = 2`. Then SeaTunnel will skip the first 2 
lines from source files                                                         
                                                                                
         |
+| csv_use_header_line       | boolean | no       | false               | 
Whether to use the header line to parse the file, only used when the 
file_format is `csv` and the file contains the header line that match RFC 4180  
                                                                                
                                                                                
                       |
 | schema                    | config  | no       | -                   | The 
schema of upstream data.                                                        
                                                                                
                                                                                
                                                                                
|
 | sheet_name                | string  | no       | -                   | 
Reader the sheet of the workbook,Only used when file_format is excel.           
                                                                                
                                                                                
                                                                                
    |
 | xml_row_tag               | string  | no       | -                   | 
Specifies the tag name of the data rows within the XML file, only used when 
file_format is xml.                                                             
                                                                                
                                                                                
        |
 | xml_use_attr_format       | boolean | no       | -                   | 
Specifies whether to process data using the tag attribute format, only used 
when file_format is xml.                                                        
                                                                                
                                                                                
        |
+| csv_use_header_line       | boolean | no       | false               | 
Whether to use the header line to parse the file, only used when the 
file_format is `csv` and the file contains the header line that match RFC 4180  
                                                                                
                                                                                
                       |
 | compress_codec            | string  | no       | none                | Which 
compress codec the files used.                                                  
                                                                                
                                                                                
                                                                              |
 | encoding                  | string  | no       | UTF-8               |
 | null_format               | string  | no       | -                   | Only 
used when file_format_type is text. null_format to define which strings can be 
represented as null. e.g: `\N`                                                  
                                                                                
                                                                                
|
diff --git a/docs/en/connector-v2/source/OssJindoFile.md 
b/docs/en/connector-v2/source/OssJindoFile.md
index b2d891d026..d173bfd7df 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -70,6 +70,7 @@ It only supports hadoop version **2.9.X+**.
 | sheet_name                | string  | no       | -                   |
 | xml_row_tag               | string  | no       | -                   |
 | xml_use_attr_format       | boolean | no       | -                   |
+| csv_use_header_line       | boolean | no       | false               |
 | file_filter_pattern       | string  | no       |                     |
 | compress_codec            | string  | no       | none                |
 | archive_compress_codec    | string  | no       | none                |
diff --git a/docs/en/connector-v2/source/S3File.md 
b/docs/en/connector-v2/source/S3File.md
index a5e5270307..ae82ca1133 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -201,10 +201,12 @@ If you assign file type to `parquet` `orc`, schema option 
not required, connecto
 | datetime_format                 | string  | no       | yyyy-MM-dd HH:mm:ss   
                                | Datetime type format, used to tell connector 
how to convert string to datetime, supported as the following 
formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` 
`yyyyMMddHHmmss`                                                                
                                                                                
                                       [...]
 | time_format                     | string  | no       | HH:mm:ss              
                                | Time type format, used to tell connector how 
to convert string to time, supported as the following formats:`HH:mm:ss` 
`HH:mm:ss.SSS`                                                                  
                                                                                
                                                                                
                      [...]
 | skip_header_row_number          | long    | no       | 0                     
                                | Skip the first few lines, but only for the 
txt and csv. For example, set like following:`skip_header_row_number = 2`. Then 
SeaTunnel will skip the first 2 lines from source files                         
                                                                                
                                                                                
                 [...]
+| csv_use_header_line             | boolean | no       | false                 
                                | Whether to use the header line to parse the 
file, only used when the file_format is `csv` and the file contains the header 
line that match RFC 4180                                                        
                                                                                
                                                 |
 | schema                          | config  | no       | -                     
                                | The schema of upstream data.                  
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | sheet_name                      | string  | no       | -                     
                                | Reader the sheet of the workbook,Only used 
when file_format is excel.                                                      
                                                                                
                                                                                
                                                                                
                 [...]
 | xml_row_tag                     | string  | no       | -                     
                                | Specifies the tag name of the data rows 
within the XML file, only valid for XML files.                                  
                                                                                
                                                                                
                                                                                
                    [...]
 | xml_use_attr_format             | boolean | no       | -                     
                                | Specifies whether to process data using the 
tag attribute format, only valid for XML files.                                 
                                                                                
                                                                                
                                                                                
                [...]
+| csv_use_header_line             | boolean | no       | false                 
                                | Whether to use the header line to parse the 
file, only used when the file_format is `csv` and the file contains the header 
line that match RFC 4180                                                        
                                                                                
                                                 |
 | compress_codec                  | string  | no       | none                  
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | archive_compress_codec          | string  | no       | none                  
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | encoding                        | string  | no       | UTF-8                 
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
diff --git a/docs/en/connector-v2/source/SftpFile.md 
b/docs/en/connector-v2/source/SftpFile.md
index c270a2516a..b6745cc681 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -93,6 +93,7 @@ The File does not have a specific type list, and we can 
indicate which SeaTunnel
 | sheet_name                | String  | No       | -                   | 
Reader the sheet of the workbook,Only used when file_format is excel.           
                                                                                
                                                                                
                                                                                
                                                |
 | xml_row_tag               | string  | no       | -                   | 
Specifies the tag name of the data rows within the XML file, only used when 
file_format is xml.                                                             
                                                                                
                                                                                
                                                    |
 | xml_use_attr_format       | boolean | no       | -                   | 
Specifies whether to process data using the tag attribute format, only used 
when file_format is xml.                                                        
                                                                                
                                                                                
                                                    |
+| csv_use_header_line       | boolean | no       | false               | 
Whether to use the header line to parse the file, only used when the 
file_format is `csv` and the file contains the header line that match RFC 4180  
                                                                                
                                                                                
                       |
 | schema                    | Config  | No       | -                   | 
Please check #schema below                                                      
                                                                                
                                                                                
                                                                                
                                                |
 | compress_codec            | String  | No       | None                | The 
compress codec of files and the details that supported as the following shown: 
<br/> - txt: `lzo` `None` <br/> - json: `lzo` `None` <br/> - csv: `lzo` `None` 
<br/> - orc: `lzo` `snappy` `lz4` `zlib` `None` <br/> - parquet: `lzo` `snappy` 
`lz4` `gzip` `brotli` `zstd` `None` <br/> Tips: excel type does Not support any 
compression format                            |
 | archive_compress_codec    | string  | no       | none                |
diff --git a/docs/zh/connector-v2/source/CosFile.md 
b/docs/zh/connector-v2/source/CosFile.md
index 584ce02f45..b160eafecd 100644
--- a/docs/zh/connector-v2/source/CosFile.md
+++ b/docs/zh/connector-v2/source/CosFile.md
@@ -47,30 +47,31 @@ import ChangeLog from '../changelog/connector-file-cos.md';
 
 ## 选项
 
-|名称                        |  类型    | 必需    | 默认值                |
-|---------------------------|---------|---------|---------------------|
-| path                      | string  | 是      | -                   |
-| file_format_type          | string  | 是      | -                   |
-| bucket                    | string  | 是      | -                   |
-| secret_id                 | string  | 是      | -                   |
-| secret_key                | string  | 是      | -                   |
-| region                    | string  | 是      | -                   |
-| read_columns              | list    | 是      | -                   |
-| delimiter/field_delimiter | string  | 否       | \001                |
-| parse_partition_from_path | boolean | 否       | true                |
-| skip_header_row_number    | long    | 否       | 0                   |
-| date_format               | string  | 否       | yyyy-MM-dd          |
-| datetime_format           | string  | 否       | yyyy-MM-dd HH:mm:ss |
-| time_format               | string  | 否       | HH:mm:ss            |
-| schema                    | config  | 否       | -                   |
-| sheet_name                | string  | 否       | -                   |
-| xml_row_tag               | string  | 否       | -                   |
-| xml_use_attr_format       | boolean | 否       | -                   |
-| file_filter_pattern       | string  | 否       |                     |
-| compress_codec            | string  | 否       | none                |
-| archive_compress_codec    | string  | 否       | none                |
-| encoding                  | string  | 否       | UTF-8               |
-| common-options            |         | 否       | -                   |
+| 名称                                    |  类型    | 必需  | 默认值                 |
+|---------------------------------------|---------|-----|---------------------|
+| path                                  | string  | 是   | -                   |
+| file_format_type                      | string  | 是   | -                   |
+| bucket                                | string  | 是   | -                   |
+| secret_id                             | string  | 是   | -                   |
+| secret_key                            | string  | 是   | -                   |
+| region                                | string  | 是   | -                   |
+| read_columns                          | list    | 是   | -                   |
+| delimiter/field_delimiter             | string  | 否   | \001                |
+| parse_partition_from_path             | boolean | 否   | true                |
+| skip_header_row_number                | long    | 否   | 0                   |
+| date_format                           | string  | 否   | yyyy-MM-dd          |
+| datetime_format                       | string  | 否   | yyyy-MM-dd HH:mm:ss |
+| time_format                           | string  | 否   | HH:mm:ss            |
+| schema                                | config  | 否   | -                   |
+| sheet_name                            | string  | 否   | -                   |
+| xml_row_tag                           | string  | 否   | -                   |
+| xml_use_attr_format                   | boolean | 否   | -                   |
+| csv_use_header_line                   | boolean | 否   | false               |
+| file_filter_pattern                   | string  | 否   |                     |
+| compress_codec                        | string  | 否   | none                |
+| archive_compress_codec                | string  | 否   | none                |
+| encoding                              | string  | 否   | UTF-8               |
+| common-options                        |         | 否   | -                   |
 
 ### path [string]
 
@@ -271,6 +272,11 @@ default `HH:mm:ss`
 仅当file_format为xml时才需要配置。
 指定是否使用标记属性格式处理数据。
 
+### csv_use_header_line [boolean]
+
+仅在文件格式为 csv 时可以选择配置。
+是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配
+
 ### file_filter_pattern [string]
 
 过滤模式,用于过滤文件。
diff --git a/docs/zh/connector-v2/source/FtpFile.md 
b/docs/zh/connector-v2/source/FtpFile.md
index 6aa1b9178d..c44a12c38c 100644
--- a/docs/zh/connector-v2/source/FtpFile.md
+++ b/docs/zh/connector-v2/source/FtpFile.md
@@ -39,32 +39,33 @@ import ChangeLog from '../changelog/connector-file-ftp.md';
 
 ## 配置项
 
-| 名称                      | 类型    | 是否必填 | 默认值              |
-|---------------------------|---------|----------|---------------------|
-| host                      | string  | 是       | -                   |
-| port                      | int     | 是       | -                   |
-| user                      | string  | 是       | -                   |
-| password                  | string  | 是       | -                   |
-| path                      | string  | 是       | -                   |
-| file_format_type          | string  | 是       | -                   |
-| connection_mode           | string  | 否       | active_local        |
-| delimiter/field_delimiter | string  | 否       | \001                |
-| read_columns              | list    | 否       | -                   |
-| parse_partition_from_path | boolean | 否       | true                |
-| date_format               | string  | 否       | yyyy-MM-dd          |
-| datetime_format           | string  | 否       | yyyy-MM-dd HH:mm:ss |
-| time_format               | string  | 否       | HH:mm:ss            |
-| skip_header_row_number    | long    | 否       | 0                   |
-| schema                    | config  | 否       | -                   |
-| sheet_name                | string  | 否       | -                   |
-| xml_row_tag               | string  | 否       | -                   |
-| xml_use_attr_format       | boolean | 否       | -                   |
-| file_filter_pattern       | string  | 否       | -                   |
-| compress_codec            | string  | 否       | none                |
-| archive_compress_codec    | string  | 否       | none                |
-| encoding                  | string  | 否       | UTF-8               |
-| null_format               | string  | 否       | -                   |
-| common-options            |         | 否       | -                   |
+| 名称                      | 类型    | 是否必填  | 默认值              |
+|---------------------------|---------|-------|---------------------|
+| host                      | string  | 是     | -                   |
+| port                      | int     | 是     | -                   |
+| user                      | string  | 是     | -                   |
+| password                  | string  | 是     | -                   |
+| path                      | string  | 是     | -                   |
+| file_format_type          | string  | 是     | -                   |
+| connection_mode           | string  | 否     | active_local        |
+| delimiter/field_delimiter | string  | 否     | \001                |
+| read_columns              | list    | 否     | -                   |
+| parse_partition_from_path | boolean | 否     | true                |
+| date_format               | string  | 否     | yyyy-MM-dd          |
+| datetime_format           | string  | 否     | yyyy-MM-dd HH:mm:ss |
+| time_format               | string  | 否     | HH:mm:ss            |
+| skip_header_row_number    | long    | 否     | 0                   |
+| schema                    | config  | 否     | -                   |
+| sheet_name                | string  | 否     | -                   |
+| xml_row_tag               | string  | 否     | -                   |
+| xml_use_attr_format       | boolean | 否     | -                   |
+| csv_use_header_line       | boolean | 否     | false               |
+| file_filter_pattern       | string  | 否     | -                   |
+| compress_codec            | string  | 否     | none                |
+| archive_compress_codec    | string  | 否     | none                |
+| encoding                  | string  | 否     | UTF-8               |
+| null_format               | string  | 否     | -                   |
+| common-options            |         | 否     | -                   |
 
 ### host [string]
 
@@ -313,6 +314,11 @@ SeaTunnel 将从源文件中跳过前 2 行。
 
 指定是否使用标签属性格式处理数据。
 
+### csv_use_header_line [boolean]
+
+仅在文件格式为 csv 时可以选择配置。
+是否使用标题行来解析文件, 标题行 与 RFC 4180 匹配        
+
 ### compress_codec [string]
 
 文件的压缩编解码器,支持的详细信息如下:
diff --git a/docs/zh/connector-v2/source/HdfsFile.md 
b/docs/zh/connector-v2/source/HdfsFile.md
index cf9c7c3687..4fa6993748 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -64,6 +64,8 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';
 | sheet_name                | string  | 否    | -              | 
读取工作簿的表格,仅在文件格式为 excel 时使用。                                                     
                                                                                
                                                                            |
 | compress_codec            | string  | 否    | none           | 文件的压缩编解码器。     
                                                                                
                                                                                
                                                             |
 | common-options            |         | 否    | -              | 源插件通用参数,请参阅 
[源通用选项](../../../en/connector-v2/source-common-options.md) 获取详细信息。              
                                                                                
                                                                |
+| csv_use_header_line       | boolean | 否    | false          | 
是否使用标题行来解析文件,仅当 file_format 为 `csv` 且文件包含与 RFC 4180 匹配的标题行时使用                   
                                                                                
                                                                                
      |
+
 
 ### delimiter/field_delimiter [string]
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
index 3474538f74..b656f555e0 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
@@ -129,6 +129,13 @@ public class BaseSourceConfigOptions {
                     .defaultValue(0L)
                     .withDescription("The number of rows to skip");
 
+    public static final Option<Boolean> CSV_USE_HEADER_LINE =
+            Options.key("csv_use_header_line")
+                    .booleanType()
+                    .defaultValue(Boolean.FALSE)
+                    .withDescription(
+                            "whether to use the header line to parse the file, 
only used when the file_format is csv");
+
     public static final Option<List<String>> READ_PARTITIONS =
             Options.key("read_partitions")
                     .listType()
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index cc5cb8820e..4cc7fb2628 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -51,8 +51,10 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class CsvReadStrategy extends AbstractReadStrategy {
@@ -67,6 +69,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
     private int[] indexes;
     private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
     private CatalogTable inputCatalogTable;
+    private boolean firstLineAsHeader = 
BaseSourceConfigOptions.CSV_USE_HEADER_LINE.defaultValue();
 
     @Override
     public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
@@ -102,9 +105,19 @@ public class CsvReadStrategy extends AbstractReadStrategy {
         }
 
         CSVFormat csvFormat = CSVFormat.DEFAULT;
+        if (firstLineAsHeader) {
+            csvFormat = csvFormat.withFirstRecordAsHeader();
+        }
         try (BufferedReader reader =
                         new BufferedReader(new 
InputStreamReader(actualInputStream, encoding));
                 CSVParser csvParser = new CSVParser(reader, csvFormat); ) {
+            // test and skip `\uFEFF` BOM
+            reader.mark(1);
+            int firstChar = reader.read();
+            if (firstChar != 0xFEFF) {
+                reader.reset();
+            }
+            // skip lines
             for (int i = 0; i < skipHeaderNumber; i++) {
                 if (reader.readLine() == null) {
                     throw new IOException(
@@ -114,10 +127,18 @@ public class CsvReadStrategy extends AbstractReadStrategy 
{
                 }
             }
             // read lines
+            List<String> headers = getHeaders(csvParser);
             for (CSVRecord csvRecord : csvParser) {
                 HashMap<Integer, String> fieldIdValueMap = new HashMap<>();
-                for (int i = 0; i < 
inputCatalogTable.getTableSchema().getColumns().size(); i++) {
-                    fieldIdValueMap.put(i, csvRecord.get(i));
+                for (int i = 0; i < headers.size(); i++) {
+                    // the user input schema may not contain all the columns 
in the csv header
+                    // and may contain columns in a different order with the 
csv header
+                    int index =
+                            
inputCatalogTable.getSeaTunnelRowType().indexOf(headers.get(i), false);
+                    if (index == -1) {
+                        continue;
+                    }
+                    fieldIdValueMap.put(index, csvRecord.get(i));
                 }
                 SeaTunnelRow seaTunnelRow = 
deserializationSchema.getSeaTunnelRow(fieldIdValueMap);
                 if (!readColumns.isEmpty()) {
@@ -152,6 +173,19 @@ public class CsvReadStrategy extends AbstractReadStrategy {
         }
     }
 
+    private List<String> getHeaders(CSVParser csvParser) {
+        List<String> headers;
+        if (firstLineAsHeader) {
+            headers = 
csvParser.getHeaderNames().stream().collect(Collectors.toList());
+        } else {
+            headers =
+                    inputCatalogTable.getTableSchema().getColumns().stream()
+                            .map(column -> column.getName())
+                            .collect(Collectors.toList());
+        }
+        return headers;
+    }
+
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
         this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
@@ -205,6 +239,10 @@ public class CsvReadStrategy extends AbstractReadStrategy {
                                 readonlyConfig
                                         
.getOptional(BaseSourceConfigOptions.NULL_FORMAT)
                                         .orElse(null));
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.CSV_USE_HEADER_LINE.key())) {
+            firstLineAsHeader =
+                    
pluginConfig.getBoolean(BaseSourceConfigOptions.CSV_USE_HEADER_LINE.key());
+        }
         if (isMergePartition) {
             deserializationSchema =
                     
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index fc69268584..63b94d82f6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -290,6 +290,14 @@ public class LocalFileIT extends TestSuiteBase {
                         "/csv/break_line.csv",
                         "/seatunnel/read/csv/break_line/break_line.csv",
                         container);
+                ContainerUtil.copyFileIntoContainers(
+                        "/csv/csv_with_header1.csv",
+                        "/seatunnel/read/csv/header/csv_with_header1.csv",
+                        container);
+                ContainerUtil.copyFileIntoContainers(
+                        "/csv/csv_with_header2.csv",
+                        "/seatunnel/read/csv/header/csv_with_header2.csv",
+                        container);
 
                 ContainerUtil.copyFileIntoContainers(
                         "/text/e2e_null_format.txt",
@@ -305,6 +313,7 @@ public class LocalFileIT extends TestSuiteBase {
         TestHelper helper = new TestHelper(container);
         helper.execute("/csv/fake_to_local_csv.conf");
         helper.execute("/csv/local_csv_to_assert.conf");
+        helper.execute("/csv/csv_with_header_to_assert.conf");
         helper.execute("/csv/breakline_csv_to_assert.conf");
         helper.execute("/excel/fake_to_local_excel.conf");
         helper.execute("/excel/local_excel_to_assert.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv
new file mode 100644
index 0000000000..25b892a001
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header1.csv
@@ -0,0 +1,2 @@
+name,id,is_female
+tom,20,true
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv
new file mode 100644
index 0000000000..63c1d91e60
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header2.csv
@@ -0,0 +1,2 @@
+name,is_female,id
+tommy,false,30
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf
new file mode 100644
index 0000000000..b0aaa6d6b2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/csv_with_header_to_assert.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/csv/header"
+    file_format_type = csv
+    csv_use_header_line = true
+    schema = {
+      fields {
+        id = int
+        name = string
+        is_female = boolean
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+            {
+              rule_type = MAX_ROW
+              rule_value = 2
+            }
+             {
+              rule_type = MIN_ROW
+              rule_value = 2
+            }
+          ]
+      field_rules = [
+          {
+            field_name = id
+            field_type = int
+            field_value = [
+                {
+                  rule_type = NOT_NULL
+                }
+            ]
+          }
+            {
+           field_name = name
+           field_type = string
+           field_value = [
+               {
+                 rule_type = NOT_NULL
+               }
+           ]
+         }
+          {
+             field_name = is_female
+             field_type = boolean
+             field_value = [
+                 {
+                   rule_type = NOT_NULL
+                 }
+             ]
+           }
+        ]
+    }
+  }
+}


Reply via email to