This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d266f4db64 [Feature][Connector-V2] Add Huawei Cloud OBS connector 
(#4578)
d266f4db64 is described below

commit d266f4db643e75c815a054efaf8b3471d279cb2a
Author: Kim <en...@qq.com>
AuthorDate: Sat Jun 15 21:29:11 2024 +0800

    [Feature][Connector-V2] Add Huawei Cloud OBS connector (#4578)
---
 docs/en/connector-v2/sink/ObsFile.md               | 287 +++++++++++++++++
 docs/en/connector-v2/source/ObsFile.md             | 350 +++++++++++++++++++++
 plugin-mapping.properties                          |   2 +
 release-note.md                                    |   2 +
 .../seatunnel/file/config/FileSystemType.java      |   3 +-
 .../connector-file/connector-file-obs/pom.xml      |  80 +++++
 .../seatunnel/file/obs/config/ObsConf.java         |  55 ++++
 .../seatunnel/file/obs/config/ObsConfig.java       |  39 +++
 .../seatunnel/file/obs/sink/ObsFileSink.java       |  63 ++++
 .../file/obs/sink/ObsFileSinkFactory.java          |  88 ++++++
 .../seatunnel/file/obs/source/ObsFileSource.java   | 126 ++++++++
 .../file/obs/source/ObsFileSourceFactory.java      |  70 +++++
 .../services/org.apache.hadoop.fs.FileSystem       |  16 +
 .../seatunnel/file/obs/ObsFileFactoryTest.java}    |  28 +-
 seatunnel-connectors-v2/connector-file/pom.xml     |   1 +
 seatunnel-dist/pom.xml                             |   7 +
 .../connector-file-obs-e2e/pom.xml                 |  45 +++
 .../e2e/connector/file/obs/ObsFileIT.java          | 113 +++++++
 .../src/test/resources/csv/fake_to_obs_csv.conf    |  85 +++++
 .../csv/obs_csv_projection_to_assert.conf          | 102 ++++++
 .../src/test/resources/csv/obs_csv_to_assert.conf  | 120 +++++++
 .../test/resources/excel/fake_to_obs_excel.conf    |  85 +++++
 .../excel/obs_excel_projection_to_assert.conf      | 112 +++++++
 .../test/resources/excel/obs_excel_to_assert.conf  | 120 +++++++
 .../test/resources/json/fake_to_obs_file_json.conf |  83 +++++
 .../resources/json/obs_file_json_to_assert.conf    | 114 +++++++
 .../test/resources/orc/fake_to_obs_file_orc.conf   |  84 +++++
 .../orc/obs_file_orc_projection_to_assert.conf     |  81 +++++
 .../test/resources/orc/obs_file_orc_to_assert.conf |  80 +++++
 .../parquet/fake_to_obs_file_parquet.conf          |  84 +++++
 .../obs_file_parquet_projection_to_assert.conf     |  81 +++++
 .../parquet/obs_file_parquet_to_assert.conf        |  80 +++++
 .../test/resources/text/fake_to_obs_file_text.conf |  84 +++++
 .../text/obs_file_text_projection_to_assert.conf   | 115 +++++++
 .../resources/text/obs_file_text_skip_headers.conf | 115 +++++++
 .../resources/text/obs_file_text_to_assert.conf    | 114 +++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 37 files changed, 3096 insertions(+), 19 deletions(-)

diff --git a/docs/en/connector-v2/sink/ObsFile.md 
b/docs/en/connector-v2/sink/ObsFile.md
new file mode 100644
index 0000000000..cfb1ec8c55
--- /dev/null
+++ b/docs/en/connector-v2/sink/ObsFile.md
@@ -0,0 +1,287 @@
+# ObsFile
+
+> Obs file sink connector
+
+## Support those engines
+
+> Spark
+>
+> Flink
+>
+> Seatunnel Zeta
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
+- [x] file format type
+  - [x] text
+  - [x] csv
+  - [x] parquet
+  - [x] orc
+  - [x] json
+  - [x] excel
+
+## Description
+
+Output data to huawei cloud obs file system.
+
+If you use spark/flink, In order to use this connector, You must ensure your 
spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
+
+If you use SeaTunnel Engine, It automatically integrated the hadoop jar when 
you download and install SeaTunnel Engine. You can check the jar package under 
${SEATUNNEL_HOME}/lib to confirm this.
+
+We made some trade-offs in order to support more file types, so we used the 
HDFS protocol for internal access to OBS and this connector need some hadoop 
dependencies.
+It only supports hadoop version **2.9.X+**.
+
+## Required Jar List
+
+|        jar         |     supported versions      |                           
                          maven                                                 
     |
+|--------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------|
+| hadoop-huaweicloud | support version >= 3.1.1.29 | 
[Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/hadoop/hadoop-huaweicloud/)
 |
+| esdk-obs-java      | support version >= 3.19.7.3 | 
[Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/com/huawei/storage/esdk-obs-java/)
     |
+| okhttp             | support version >= 3.11.0   | 
[Download](https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp/)         
                               |
+| okio               | support version >= 1.14.0   | 
[Download](https://repo1.maven.org/maven2/com/squareup/okio/okio/)              
                               |
+
+> Please download the support list corresponding to 'Maven' and copy them to 
the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory.
+>
+> And copy all jars to $SEATNUNNEL_HOME/lib/
+
+## Options
+
+|               name               |  type   | required |                  
default                   |                                                     
           description                                                          
       |
+|----------------------------------|---------|----------|--------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------|
+| path                             | string  | yes      | -                    
                      | The target dir path.                                    
                                                                                
   |
+| bucket                           | string  | yes      | -                    
                      | The bucket address of obs file system, for example: 
`obs://obs-bucket-name`.                                                        
       |
+| access_key                       | string  | yes      | -                    
                      | The access key of obs file system.                      
                                                                                
   |
+| access_secret                    | string  | yes      | -                    
                      | The access secret of obs file system.                   
                                                                                
   |
+| endpoint                         | string  | yes      | -                    
                      | The endpoint of obs file system.                        
                                                                                
   |
+| custom_filename                  | boolean | no       | false                
                      | Whether you need custom the filename.                   
                                                                                
   |
+| file_name_expression             | string  | no       | "${transactionId}"   
                      | Describes the file expression which will be created 
into the `path`. Only used when custom_filename is true. 
[Tips](#file_name_expression) |
+| filename_time_format             | string  | no       | "yyyy.MM.dd"         
                      | Specify the time format of the `path`. Only used when 
custom_filename is true. [Tips](#filename_time_format)                          
     |
+| file_format_type                 | string  | no       | "csv"                
                      | Supported file types. [Tips](#file_format_type)         
                                                                                
   |
+| field_delimiter                  | string  | no       | '\001'               
                      | The separator between columns in a row of data.Only 
used when file_format is text.                                                  
       |
+| row_delimiter                    | string  | no       | "\n"                 
                      | The separator between rows in a file. Only needed by 
`text` file format.                                                             
      |
+| have_partition                   | boolean | no       | false                
                      | Whether you need processing partitions.                 
                                                                                
   |
+| partition_by                     | array   | no       | -                    
                      | Partition data based on selected fields. Only used then 
have_partition is true.                                                         
   |
+| partition_dir_expression         | string  | no       | 
"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is 
true.[Tips](#partition_dir_expression)                                          
                          |
+| is_partition_field_write_in_file | boolean | no       | false                
                      | Only used then have_partition is 
true.[Tips](#is_partition_field_write_in_file)                                  
                          |
+| sink_columns                     | array   | no       |                      
                      | When this parameter is empty, all fields are sink 
columns.[Tips](#sink_columns)                                                   
         |
+| is_enable_transaction            | boolean | no       | true                 
                      | [Tips](#is_enable_transaction)                          
                                                                                
   |
+| batch_size                       | int     | no       | 1000000              
                      | [Tips](#batch_size)                                     
                                                                                
   |
+| compress_codec                   | string  | no       | none                 
                      | [Tips](#compress_codec)                                 
                                                                                
   |
+| common-options                   | object  | no       | -                    
                      | [Tips](#common_options)                                 
                                                                                
   |
+| max_rows_in_memory               | int     | no       | -                    
                      | When File Format is Excel,The maximum number of data 
items that can be cached in the memory.Only used when file_format is excel.     
      |
+| sheet_name                       | string  | no       | Sheet${Random 
number}                      | Writer the sheet of the workbook. Only used when 
file_format is excel.                                                           
          |
+
+### Tips
+
+#### <span id="file_name_expression"> file_name_expression </span>
+
+> Only used when `custom_filename` is `true`
+>
+> `file_name_expression` describes the file expression which will be created 
into the `path`.
+>
+> We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, 
like `test_${uuid}_${now}`,
+>
+> `${now}` represents the current time, and its format can be defined by 
specifying the option `filename_time_format`.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add 
`${transactionId}_` in the head of the file.
+
+#### <span id="filename_time_format"> filename_time_format </span>
+
+> Only used when `custom_filename` is `true`
+>
+> When the format in the `file_name_expression` parameter is `xxxx-${now}` , 
`filename_time_format` can specify the time format of the path, and the default 
value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol |    Description     |
+|--------|--------------------|
+| y      | Year               |
+| M      | Month              |
+| d      | Day of month       |
+| H      | Hour in day (0-23) |
+| m      | Minute in hour     |
+| s      | Second in minute   |
+
+#### <span id="file_format_type"> file_format_type </span>
+
+> We supported as the following file types:
+>
+> `text` `json` `csv` `orc` `parquet` `excel`
+
+Please note that, The final file name will end with the file_format's suffix, 
the suffix of the text file is `txt`.
+
+#### <span id="partition_dir_expression"> partition_dir_expression </span>
+
+> Only used when `have_partition` is `true`.
+>
+> If the `partition_by` is specified, we will generate the corresponding 
partition directory based on the partition information, and the final file will 
be placed in the partition directory.
+>
+> Default `partition_dir_expression` is 
`${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field 
and `v0` is the value of the first partition field.
+
+#### <span id="is_partition_field_write_in_file"> 
is_partition_field_write_in_file </span>
+
+> Only used when `have_partition` is `true`.
+>
+> If `is_partition_field_write_in_file` is `true`, the partition field and the 
value of it will be write into data file.
+>
+> For example, if you want to write a Hive Data File, Its value should be 
`false`.
+
+#### <span id="sink_columns"> sink_columns </span>
+
+> Which columns need be written to file, default value is all the columns get 
from `Transform` or `Source`.
+> The order of the fields determines the order in which the file is actually 
written.
+
+#### <span id="is_enable_transaction"> is_enable_transaction </span>
+
+> If `is_enable_transaction` is true, we will ensure that data will not be 
lost or duplicated when it is written to the target directory.
+>
+> Please note that, If `is_enable_transaction` is `true`, we will auto add 
`${transactionId}_` in the head of the file. Only support `true` now.
+
+#### <span id="batch_size"> batch_size </span>
+
+> The maximum number of rows in a file. For SeaTunnel Engine, the number of 
lines in the file is determined by `batch_size` and `checkpoint.interval` 
jointly decide. If the value of `checkpoint.interval` is large enough, sink 
writer will write rows in a file until the rows in the file larger than 
`batch_size`. If `checkpoint.interval` is small, the sink writer will create a 
new file when a new checkpoint trigger.
+
+#### <span id="compress_codec"> compress_codec </span>
+
+> The compress codec of files and the details that supported as the following 
shown:
+>
+> - txt: `lzo` `none`
+> - json: `lzo` `none`
+> - csv: `lzo` `none`
+> - orc: `lzo` `snappy` `lz4` `zlib` `none`
+> - parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
+Please note that excel type does not support any compression format
+
+#### <span id="common_options"> common options </span>
+
+> Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
+
+## Task Example
+
+### text file
+
+> For text file format with `have_partition` and `custom_filename` and 
`sink_columns`
+
+```hocon
+
+  ObsFile {
+    path="/seatunnel/text"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxx"
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "text"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    have_partition = true
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    custom_filename = true
+    file_name_expression = "${transactionId}_${now}"
+    filename_time_format = "yyyy.MM.dd"
+    sink_columns = ["name","age"]
+    is_enable_transaction = true
+  }
+
+```
+
+### parquet file
+
+> For parquet file format with `have_partition` and `sink_columns`
+
+```hocon
+
+  ObsFile {
+    path = "/seatunnel/parquet"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxx"
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    have_partition = true
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_format_type = "parquet"
+    sink_columns = ["name","age"]
+  }
+
+```
+
+### orc file
+
+> For orc file format simple config
+
+```hocon
+
+  ObsFile {
+    path="/seatunnel/orc"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxx"
+    endpoint = "obs.xxxxx.myhuaweicloud.com"
+    file_format_type = "orc"
+  }
+
+```
+
+### json file
+
+> For json file format simple config
+
+```hcocn
+
+   ObsFile {
+       path = "/seatunnel/json"
+       bucket = "obs://obs-bucket-name"
+       access_key = "xxxxxxxxxxx"
+       access_secret = "xxxxxxxxxxx"
+       endpoint = "obs.xxxxx.myhuaweicloud.com"
+       file_format_type = "json"
+   }
+
+```
+
+### excel file
+
+> For excel file format simple config
+
+```hcocn
+
+   ObsFile {
+       path = "/seatunnel/excel"
+       bucket = "obs://obs-bucket-name"
+       access_key = "xxxxxxxxxxx"
+       access_secret = "xxxxxxxxxxx"
+       endpoint = "obs.xxxxx.myhuaweicloud.com"
+       file_format_type = "excel"
+   }
+
+```
+
+### csv file
+
+> For csv file format simple config
+
+```hcocn
+
+   ObsFile {
+       path = "/seatunnel/csv"
+       bucket = "obs://obs-bucket-name"
+       access_key = "xxxxxxxxxxx"
+       access_secret = "xxxxxxxxxxx"
+       endpoint = "obs.xxxxx.myhuaweicloud.com"
+       file_format_type = "csv"
+   }
+
+```
+
+## Changelog
+
+### next version
+
+- Add Obs Sink Connector
+
diff --git a/docs/en/connector-v2/source/ObsFile.md 
b/docs/en/connector-v2/source/ObsFile.md
new file mode 100644
index 0000000000..b5363d7717
--- /dev/null
+++ b/docs/en/connector-v2/source/ObsFile.md
@@ -0,0 +1,350 @@
+# ObsFile
+
+> Obs file source connector
+
+## Support those engines
+
+> Spark
+>
+> Flink
+>
+> Seatunnel Zeta
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+Read all the data in a split in a pollNext call. What splits are read will be 
saved in snapshot.
+
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+- [x] file format type
+  - [x] text
+  - [x] csv
+  - [x] parquet
+  - [x] orc
+  - [x] json
+  - [x] excel
+
+## Description
+
+Read data from huawei cloud obs file system.
+
+If you use spark/flink, In order to use this connector, You must ensure your 
spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
+
+If you use SeaTunnel Engine, It automatically integrated the hadoop jar when 
you download and install SeaTunnel Engine. You can check the jar package under 
${SEATUNNEL_HOME}/lib to confirm this.
+
+We made some trade-offs in order to support more file types, so we used the 
HDFS protocol for internal access to OBS and this connector need some hadoop 
dependencies.
+It only supports hadoop version **2.9.X+**.
+
+## Required Jar List
+
+|        jar         |     supported versions      |                           
                          maven                                                 
     |
+|--------------------|-----------------------------|----------------------------------------------------------------------------------------------------------------|
+| hadoop-huaweicloud | support version >= 3.1.1.29 | 
[Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/hadoop/hadoop-huaweicloud/)
 |
+| esdk-obs-java      | support version >= 3.19.7.3 | 
[Download](https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/com/huawei/storage/esdk-obs-java/)
     |
+| okhttp             | support version >= 3.11.0   | 
[Download](https://repo1.maven.org/maven2/com/squareup/okhttp3/okhttp/)         
                               |
+| okio               | support version >= 1.14.0   | 
[Download](https://repo1.maven.org/maven2/com/squareup/okio/okio/)              
                               |
+
+> Please download the support list corresponding to 'Maven' and copy them to 
the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory.
+>
+> And copy all jars to $SEATNUNNEL_HOME/lib/
+
+## Options
+
+|           name            |  type   | required |       default       |       
                                          description                           
                       |
+|---------------------------|---------|----------|---------------------|--------------------------------------------------------------------------------------------------------------|
+| path                      | string  | yes      | -                   | The 
target dir path                                                                 
                         |
+| file_format_type          | string  | yes      | -                   | File 
type.[Tips](#file_format_type)                                                  
                        |
+| bucket                    | string  | yes      | -                   | The 
bucket address of obs file system, for example: `obs://obs-bucket-name`         
                         |
+| access_key                | string  | yes      | -                   | The 
access key of obs file system                                                   
                         |
+| access_secret             | string  | yes      | -                   | The 
access secret of obs file system                                                
                         |
+| endpoint                  | string  | yes      | -                   | The 
endpoint of obs file system                                                     
                         |
+| read_columns              | list    | yes      | -                   | The 
read column list of the data source, user can use it to implement field 
projection.[Tips](#read_columns) |
+| delimiter                 | string  | no       | \001                | Field 
delimiter, used to tell connector how to slice and dice fields when reading 
text files                 |
+| parse_partition_from_path | boolean | no       | true                | 
Control whether parse the partition keys and values from file path. 
[Tips](#parse_partition_from_path)       |
+| skip_header_row_number    | long    | no       | 0                   | Skip 
the first few lines, but only for the txt and csv.                              
                        |
+| date_format               | string  | no       | yyyy-MM-dd          | Date 
type format, used to tell the connector how to convert string to 
date.[Tips](#date_format)              |
+| datetime_format           | string  | no       | yyyy-MM-dd HH:mm:ss | 
Datetime type format, used to tell the connector how to convert string to 
datetime.[Tips](#datetime_format)  |
+| time_format               | string  | no       | HH:mm:ss            | Time 
type format, used to tell the connector how to convert string to 
time.[Tips](#time_format)              |
+| schema                    | config  | no       | -                   | 
[Tips](#schema)                                                                 
                             |
+| common-options            |         | no       | -                   | 
[Tips](#common_options)                                                         
                             |
+| sheet_name                | string  | no       | -                   | 
Reader the sheet of the workbook,Only used when file_format is excel.           
                             |
+
+### Tips
+
+#### <span id="parse_partition_from_path"> parse_partition_from_path </span>
+
+> Control whether parse the partition keys and values from file path
+>
+> For example if you read a file from path 
`obs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+>
+> Every record data from the file will be added these two fields:
+
+|     name      | age |
+|---------------|-----|
+| tyrantlucifer | 26  |
+
+> Do not define partition fields in schema option
+
+#### <span id="date_format"> date_format </span>
+
+> Date type format, used to tell the connector how to convert string to date, 
supported as the following formats:
+>
+> `yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+>
+> default `yyyy-MM-dd`
+
+### <span id="datetime_format"> datetime_format </span>
+
+> Datetime type format, used to tell the connector how to convert string to 
datetime, supported as the following formats:
+>
+> `yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` 
`yyyyMMddHHmmss`
+>
+> default `yyyy-MM-dd HH:mm:ss`
+
+### <span id="time_format"> time_format </span>
+
+> Time type format, used to tell the connector how to convert string to time, 
supported as the following formats:
+>
+> `HH:mm:ss` `HH:mm:ss.SSS`
+>
+> default `HH:mm:ss`
+
+### <span id="skip_header_row_number"> skip_header_row_number </span>
+
+> Skip the first few lines, but only for the txt and csv.
+>
+> For example, set like following:
+>
+> `skip_header_row_number = 2`
+>
+> Then Seatunnel will skip the first 2 lines from source files
+
+### <span id="file_format_type"> file_format_type </span>
+
+> File type, supported as the following file types:
+>
+> `text` `csv` `parquet` `orc` `json` `excel`
+>
+> If you assign file type to `json`, you should also assign schema option to 
tell the connector how to parse data to the row you want.
+>
+> For example,upstream data is the following:
+>
+> ```json
+>
+> ```
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+> You can also save multiple pieces of data in one file and split them by one 
newline:
+
+```json lines
+
+{"code":  200, "data":  "get success", "success":  true}
+{"code":  300, "data":  "get failed", "success":  false}
+
+```
+
+> you should assign schema as the following:
+
+```hocon
+
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
+
+```
+
+> connector will generate data as the following:
+
+| code |    data     | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+> If you assign file type to `parquet` `orc`, schema option not required, 
connector can find the schema of upstream data automatically.
+>
+> If you assign file type to `text` `csv`, you can choose to specify the 
schema information or not.
+>
+> For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
+
+```
+
+> If you do not assign data schema connector will treat the upstream data as 
the following:
+
+|        content        |
+|-----------------------|
+| tyrantlucifer#26#male |
+
+> If you assign data schema, you should also assign the option `delimiter` too 
except CSV file type
+>
+> you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+    fields {
+        name = string
+        age = int
+        gender = string 
+    }
+}
+
+```
+
+> connector will generate data as the following:
+
+|     name      | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26  | male   |
+
+#### <span id="schema"> schema  </span>
+
+##### fields
+
+> The schema of upstream data.
+
+#### <span id="schema"> read_columns </span>
+
+> The read column list of the data source, user can use it to implement field 
projection.
+>
+> The file type supported column projection as the following shown:
+
+- text
+- json
+- csv
+- orc
+- parquet
+- excel
+
+> If the user wants to use this feature when reading `text` `json` `csv` 
files, the schema option must be configured
+
+#### <span id="common_options "> common options </span>
+
+> Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+
+## Task Example
+
+### text file
+
+> For text file format simple config
+
+```hocon
+
+  ObsFile {
+    path = "/seatunnel/text"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "text"
+  }
+
+```
+
+### parquet file
+
+> For parquet file format simple config
+
+```hocon
+
+  ObsFile {
+    path = "/seatunnel/parquet"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "parquet"
+  }
+
+```
+
+### orc file
+
+> For orc file format simple config
+
+```hocon
+
+  ObsFile {
+    path = "/seatunnel/orc"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "orc"
+  }
+
+```
+
+### json file
+
+> For json file format simple config
+
+```hocon
+
+  ObsFile {
+    path = "/seatunnel/json"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "json"
+  }
+
+```
+
+### excel file
+
+> For excel file format simple config
+
+```hocon
+
+  ObsFile {
+    path = "/seatunnel/excel"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "excel"
+  }
+
+```
+
+### csv file
+
+> For csv file format simple config
+
+```hocon
+
+  ObsFile {
+    path = "/seatunnel/csv"
+    bucket = "obs://obs-bucket-name"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "csv"
+    delimiter = ","
+  }
+
+```
+
+## Changelog
+
+### next version
+
+- Add Obs File Source Connector
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 314d453ffc..411e42b880 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -124,3 +124,5 @@ seatunnel.sink.Easysearch = connector-easysearch
 seatunnel.source.Postgres-CDC = connector-cdc-postgres
 seatunnel.source.Oracle-CDC = connector-cdc-oracle
 seatunnel.sink.Pulsar = connector-pulsar
+seatunnel.source.ObsFile = connector-file-obs
+seatunnel.sink.ObsFile = connector-file-obs
diff --git a/release-note.md b/release-note.md
index b799df78f7..840648fe64 100644
--- a/release-note.md
+++ b/release-note.md
@@ -177,6 +177,7 @@
 - [Connector-V2] [Kafka] Kafka source supports data deserialization failure 
skipping (#4364)
 - [Connector-V2] [Jdbc] [TiDB] Add TiDB catalog (#4438)
 - [Connector-V2] [File] Add file excel sink and source (#4164)
+- [Connector-V2] [FILE-OBS] Add Huawei Cloud OBS connector (#4577)
 - [Connector-v2] [Snowflake] Add Snowflake Source&Sink connector (#4470)
 - [Connector-V2] [Pular] support read format for pulsar (#4111)
 - [Connector-V2] [Paimon] Introduce paimon connector (#4178)
@@ -193,6 +194,7 @@
 - [Connector-V2] [Assert] Support check the precision and scale of Decimal 
type (#6110)
 - [Connector-V2] [Assert] Support field type assert and field value equality 
assert for full data types (#6275)
 - [Connector-V2] [Iceberg] Support iceberg sink #6198
+- [Connector-V2] [FILE-OBS] Add Huawei Cloud OBS connector #4578
 
 ### Zeta(ST-Engine)
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
index 3d3965b7c3..c7a5c26aec 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
@@ -27,7 +27,8 @@ public enum FileSystemType implements Serializable {
     COS("CosFile"),
     FTP("FtpFile"),
     SFTP("SftpFile"),
-    S3("S3File");
+    S3("S3File"),
+    OBS("ObsFile");
 
     private final String fileSystemPluginName;
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml
new file mode 100644
index 0000000000..00676916fd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>connector-file</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-file-obs</artifactId>
+    <name>SeaTunnel : Connectors V2 : File : Obs</name>
+
+    <properties>
+        <hadoop-huaweicloud.version>3.1.1.29</hadoop-huaweicloud.version>
+        <esdk.version>3.19.7.3</esdk.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-huaweicloud</artifactId>
+            <version>${hadoop-huaweicloud.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.huawei.storage</groupId>
+                    <artifactId>esdk-obs-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.huawei.storage</groupId>
+            <artifactId>esdk-obs-java</artifactId>
+            <version>${esdk.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <repositories>
+        <repository>
+            <id>huaweiCloud</id>
+            
<url>https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/</url>
+        </repository>
+    </repositories>
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
new file mode 100644
index 0000000000..714e7ede17
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.apache.hadoop.fs.obs.Constants;
+
+import java.util.HashMap;
+
+public class ObsConf extends HadoopConf {
+    private static final String HDFS_IMPL = 
"org.apache.hadoop.fs.obs.OBSFileSystem";
+    private static final String SCHEMA = "obs";
+
+    @Override
+    public String getFsHdfsImpl() {
+        return HDFS_IMPL;
+    }
+
+    @Override
+    public String getSchema() {
+        return SCHEMA;
+    }
+
+    public ObsConf(String hdfsNameKey) {
+        super(hdfsNameKey);
+    }
+
+    public static HadoopConf buildWithConfig(Config config) {
+        HadoopConf hadoopConf = new 
ObsConf(config.getString(ObsConfig.BUCKET.key()));
+        HashMap<String, String> ossOptions = new HashMap<>();
+        ossOptions.put(Constants.ACCESS_KEY, 
config.getString(ObsConfig.ACCESS_KEY.key()));
+        ossOptions.put(Constants.SECRET_KEY, 
config.getString(ObsConfig.ACCESS_SECRET.key()));
+        ossOptions.put(Constants.ENDPOINT, 
config.getString(ObsConfig.ENDPOINT.key()));
+        hadoopConf.setExtraOptions(ossOptions);
+        return hadoopConf;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java
new file mode 100644
index 0000000000..a4893f6c15
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+
+public class ObsConfig extends BaseSourceConfigOptions {
+    public static final Option<String> ACCESS_KEY =
+            Options.key("access_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("OBS bucket access key");
+    public static final Option<String> ACCESS_SECRET =
+            Options.key("access_secret")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("OBS bucket access secret");
+    public static final Option<String> ENDPOINT =
+            
Options.key("endpoint").stringType().noDefaultValue().withDescription("OBS 
endpoint");
+    public static final Option<String> BUCKET =
+            
Options.key("bucket").stringType().noDefaultValue().withDescription("OBS 
bucket");
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
new file mode 100644
index 0000000000..8f303b6a45
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConf;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class ObsFileSink extends BaseFileSink {
+    @Override
+    public String getPluginName() {
+        return FileSystemType.OBS.getFileSystemPluginName();
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        super.prepare(pluginConfig);
+        CheckResult result =
+                CheckConfigUtil.checkAllExists(
+                        pluginConfig,
+                        ObsConfig.FILE_PATH.key(),
+                        ObsConfig.BUCKET.key(),
+                        ObsConfig.ACCESS_KEY.key(),
+                        ObsConfig.ACCESS_SECRET.key(),
+                        ObsConfig.BUCKET.key());
+        if (!result.isSuccess()) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            getPluginName(), PluginType.SINK, 
result.getMsg()));
+        }
+        hadoopConf = ObsConf.buildWithConfig(pluginConfig);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
new file mode 100644
index 0000000000..8f1c221e07
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class ObsFileSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return FileSystemType.OBS.getFileSystemPluginName();
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(ObsConfig.FILE_PATH)
+                .required(ObsConfig.BUCKET)
+                .required(ObsConfig.ACCESS_KEY)
+                .required(ObsConfig.ACCESS_SECRET)
+                .required(ObsConfig.ENDPOINT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+                .conditional(
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSinkConfig.ROW_DELIMITER,
+                        BaseSinkConfig.FIELD_DELIMITER,
+                        BaseSinkConfig.TXT_COMPRESS)
+                .conditional(
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.CSV,
+                        BaseSinkConfig.TXT_COMPRESS)
+                .conditional(
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.JSON,
+                        BaseSinkConfig.TXT_COMPRESS)
+                .conditional(
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.ORC,
+                        BaseSinkConfig.ORC_COMPRESS)
+                .conditional(
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.PARQUET,
+                        BaseSinkConfig.PARQUET_COMPRESS)
+                .optional(BaseSinkConfig.CUSTOM_FILENAME)
+                .conditional(
+                        BaseSinkConfig.CUSTOM_FILENAME,
+                        true,
+                        BaseSinkConfig.FILE_NAME_EXPRESSION,
+                        BaseSinkConfig.FILENAME_TIME_FORMAT)
+                .optional(BaseSinkConfig.HAVE_PARTITION)
+                .conditional(
+                        BaseSinkConfig.HAVE_PARTITION,
+                        true,
+                        BaseSinkConfig.PARTITION_BY,
+                        BaseSinkConfig.PARTITION_DIR_EXPRESSION,
+                        BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE)
+                .optional(BaseSinkConfig.SINK_COLUMNS)
+                .optional(BaseSinkConfig.IS_ENABLE_TRANSACTION)
+                .optional(BaseSinkConfig.DATE_FORMAT)
+                .optional(BaseSinkConfig.DATETIME_FORMAT)
+                .optional(BaseSinkConfig.TIME_FORMAT)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
new file mode 100644
index 0000000000..cf3061a44a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConf;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSource.class)
+public class ObsFileSource extends BaseFileSource {
+    @Override
+    public String getPluginName() {
+        return FileSystemType.OBS.getFileSystemPluginName();
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result =
+                CheckConfigUtil.checkAllExists(
+                        pluginConfig,
+                        ObsConfig.FILE_PATH.key(),
+                        ObsConfig.FILE_FORMAT_TYPE.key(),
+                        ObsConfig.ENDPOINT.key(),
+                        ObsConfig.ACCESS_KEY.key(),
+                        ObsConfig.ACCESS_SECRET.key(),
+                        ObsConfig.BUCKET.key());
+        if (!result.isSuccess()) {
+            throw new FileConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
+        }
+        readStrategy =
+                
ReadStrategyFactory.of(pluginConfig.getString(ObsConfig.FILE_FORMAT_TYPE.key()));
+        readStrategy.setPluginConfig(pluginConfig);
+        hadoopConf = ObsConf.buildWithConfig(pluginConfig);
+        readStrategy.init(hadoopConf);
+        String path = pluginConfig.getString(ObsConfig.FILE_PATH.key());
+        try {
+            filePaths = readStrategy.getFileNamesByPath(path);
+        } catch (IOException e) {
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
+            throw new FileConnectorException(
+                    FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
+        }
+        // support user-defined schema
+        FileFormat fileFormat =
+                FileFormat.valueOf(
+                        
pluginConfig.getString(ObsConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
+        // only json text csv type support user-defined schema now
+        if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
+            switch (fileFormat) {
+                case CSV:
+                case TEXT:
+                case JSON:
+                case EXCEL:
+                    SeaTunnelRowType userDefinedSchema =
+                            
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
+                    break;
+                case ORC:
+                case PARQUET:
+                    throw new FileConnectorException(
+                            CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+                            "SeaTunnel does not support user-defined schema 
for [parquet, orc] files");
+                default:
+                    // never got in there
+                    throw new FileConnectorException(
+                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                            "SeaTunnel does not supported this file format");
+            }
+        } else {
+            if (filePaths.isEmpty()) {
+                // When the directory is empty, distribute default behavior 
schema
+                rowType = CatalogTableUtil.buildSimpleTextSchema();
+                return;
+            }
+            try {
+                rowType = 
readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
+            } catch (FileConnectorException e) {
+                String errorMsg =
+                        String.format("Get table schema from file [%s] 
failed", filePaths.get(0));
+                throw new FileConnectorException(
+                        CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, 
errorMsg, e);
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
new file mode 100644
index 0000000000..e1cd0ee97b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.obs.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConfig;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Arrays;
+
+@AutoService(Factory.class)
+public class ObsFileSourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return FileSystemType.OBS.getFileSystemPluginName();
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(ObsConfig.FILE_PATH)
+                .required(ObsConfig.BUCKET)
+                .required(ObsConfig.ACCESS_KEY)
+                .required(ObsConfig.ACCESS_SECRET)
+                .required(ObsConfig.ENDPOINT)
+                .required(BaseSourceConfigOptions.FILE_FORMAT_TYPE)
+                .conditional(
+                        BaseSourceConfigOptions.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSourceConfigOptions.FIELD_DELIMITER)
+                .conditional(
+                        BaseSourceConfigOptions.FILE_FORMAT_TYPE,
+                        Arrays.asList(
+                                FileFormat.TEXT, FileFormat.JSON, 
FileFormat.EXCEL, FileFormat.CSV),
+                        TableSchemaOptions.SCHEMA)
+                .optional(BaseSourceConfigOptions.PARSE_PARTITION_FROM_PATH)
+                .optional(BaseSourceConfigOptions.DATE_FORMAT)
+                .optional(BaseSourceConfigOptions.DATETIME_FORMAT)
+                .optional(BaseSourceConfigOptions.TIME_FORMAT)
+                .build();
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return ObsFileSource.class;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000000..7559087140
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.obs.OBSFileSystem
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java
similarity index 58%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java
index 3d3965b7c3..dc7c34525a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/obs/ObsFileFactoryTest.java
@@ -15,27 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.obs;
 
-import java.io.Serializable;
+import 
org.apache.seatunnel.connectors.seatunnel.file.obs.sink.ObsFileSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.obs.source.ObsFileSourceFactory;
 
-public enum FileSystemType implements Serializable {
-    HDFS("HdfsFile"),
-    LOCAL("LocalFile"),
-    OSS("OssFile"),
-    OSS_JINDO("OssJindoFile"),
-    COS("CosFile"),
-    FTP("FtpFile"),
-    SFTP("SftpFile"),
-    S3("S3File");
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-    private final String fileSystemPluginName;
+public class ObsFileFactoryTest {
 
-    FileSystemType(String fileSystemPluginName) {
-        this.fileSystemPluginName = fileSystemPluginName;
-    }
-
-    public String getFileSystemPluginName() {
-        return fileSystemPluginName;
+    @Test
+    void optionRule() {
+        Assertions.assertNotNull((new ObsFileSourceFactory()).optionRule());
+        Assertions.assertNotNull((new ObsFileSinkFactory()).optionRule());
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml 
b/seatunnel-connectors-v2/connector-file/pom.xml
index 4bdfa981ce..efb32ab444 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -38,6 +38,7 @@
         <module>connector-file-base-hadoop</module>
         <module>connector-file-sftp</module>
         <module>connector-file-s3</module>
+        <module>connector-file-obs</module>
         <module>connector-file-jindo-oss</module>
         <module>connector-file-cos</module>
     </modules>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 28b987b401..59ce612230 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -541,6 +541,13 @@
                     <scope>provided</scope>
                 </dependency>
 
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-file-obs</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>connector-paimon</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml
new file mode 100644
index 0000000000..9ee7fd4f14
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <artifactId>connector-file-obs-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : File Obs</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-obs</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java
new file mode 100644
index 0000000000..c5a87959d6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/obs/ObsFileIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.file.obs;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.flink.Flink13Container;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Disabled("Please testing it in your local environment with obs account conf")
+public class ObsFileIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void testLocalFileReadAndWrite(TestContainer container)
+            throws IOException, InterruptedException {
+        if (container instanceof Flink13Container) {
+            return;
+        }
+        // test write obs csv file
+        Container.ExecResult csvWriteResult = 
container.executeJob("/csv/fake_to_obs_csv.conf");
+        Assertions.assertEquals(0, csvWriteResult.getExitCode(), 
csvWriteResult.getStderr());
+        // test read obs csv file
+        Container.ExecResult csvReadResult = 
container.executeJob("/csv/obs_csv_to_assert.conf");
+        Assertions.assertEquals(0, csvReadResult.getExitCode(), 
csvReadResult.getStderr());
+        // test read obs csv file with projection
+        Container.ExecResult csvProjectionReadResult =
+                container.executeJob("/csv/obs_csv_projection_to_assert.conf");
+        Assertions.assertEquals(
+                0, csvProjectionReadResult.getExitCode(), 
csvProjectionReadResult.getStderr());
+        // test write obs excel file
+        Container.ExecResult excelWriteResult =
+                container.executeJob("/excel/fake_to_obs_excel.conf");
+        Assertions.assertEquals(0, excelWriteResult.getExitCode(), 
excelWriteResult.getStderr());
+        // test read obs excel file
+        Container.ExecResult excelReadResult =
+                container.executeJob("/excel/obs_excel_to_assert.conf");
+        Assertions.assertEquals(0, excelReadResult.getExitCode(), 
excelReadResult.getStderr());
+        // test read obs excel file with projection
+        Container.ExecResult excelProjectionReadResult =
+                
container.executeJob("/excel/obs_excel_projection_to_assert.conf");
+        Assertions.assertEquals(
+                0, excelProjectionReadResult.getExitCode(), 
excelProjectionReadResult.getStderr());
+        // test write obs text file
+        Container.ExecResult textWriteResult =
+                container.executeJob("/text/fake_to_obs_file_text.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        // test read skip header
+        Container.ExecResult textWriteAndSkipResult =
+                container.executeJob("/text/obs_file_text_skip_headers.conf");
+        Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+        // test read obs text file
+        Container.ExecResult textReadResult =
+                container.executeJob("/text/obs_file_text_to_assert.conf");
+        Assertions.assertEquals(0, textReadResult.getExitCode());
+        // test read obs text file with projection
+        Container.ExecResult textProjectionResult =
+                
container.executeJob("/text/obs_file_text_projection_to_assert.conf");
+        Assertions.assertEquals(0, textProjectionResult.getExitCode());
+        // test write obs json file
+        Container.ExecResult jsonWriteResult =
+                container.executeJob("/json/fake_to_obs_file_json.conf");
+        Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+        // test read obs json file
+        Container.ExecResult jsonReadResult =
+                container.executeJob("/json/obs_file_json_to_assert.conf");
+        Assertions.assertEquals(0, jsonReadResult.getExitCode());
+        // test write obs orc file
+        Container.ExecResult orcWriteResult =
+                container.executeJob("/orc/fake_to_obs_file_orc.conf");
+        Assertions.assertEquals(0, orcWriteResult.getExitCode());
+        // test read obs orc file
+        Container.ExecResult orcReadResult =
+                container.executeJob("/orc/obs_file_orc_to_assert.conf");
+        Assertions.assertEquals(0, orcReadResult.getExitCode());
+        // test read obs orc file with projection
+        Container.ExecResult orcProjectionResult =
+                
container.executeJob("/orc/obs_file_orc_projection_to_assert.conf");
+        Assertions.assertEquals(0, orcProjectionResult.getExitCode());
+        // test write obs parquet file
+        Container.ExecResult parquetWriteResult =
+                container.executeJob("/parquet/fake_to_obs_file_parquet.conf");
+        Assertions.assertEquals(0, parquetWriteResult.getExitCode());
+        // test read obs parquet file
+        Container.ExecResult parquetReadResult =
+                
container.executeJob("/parquet/obs_file_parquet_to_assert.conf");
+        Assertions.assertEquals(0, parquetReadResult.getExitCode());
+        // test read obs parquet file with projection
+        Container.ExecResult parquetProjectionResult =
+                
container.executeJob("/parquet/obs_file_parquet_projection_to_assert.conf");
+        Assertions.assertEquals(0, parquetProjectionResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf
new file mode 100644
index 0000000000..8ed1e64fce
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/fake_to_obs_csv.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  ObsFile {
+    path="/seatunnel/csv"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format_type="csv"
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf
new file mode 100644
index 0000000000..da22e3e90a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_projection_to_assert.conf
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+source {
+  ObsFile {
+    path="/seatunnel/csv"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    result_table_name = "fake"
+    file_format_type = csv
+    delimiter = ","
+    read_columns = [c_string, c_boolean]
+    skip_header_row_number = 1
+    schema = {
+      fields {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, string>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_bytes = bytes
+            c_date = date
+            c_decimal = "decimal(38, 18)"
+            c_timestamp = timestamp
+          }
+      }
+    }
+  }
+}
+
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf
new file mode 100644
index 0000000000..52bbcf5ab9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/csv/obs_csv_to_assert.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path="/seatunnel/csv"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    result_table_name = "fake"
+    file_format_type = csv
+    delimiter = ","
+    skip_header_row_number = 1
+    schema = {
+      fields {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, string>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_bytes = bytes
+            c_date = date
+            c_decimal = "decimal(38, 18)"
+            c_timestamp = timestamp
+          }
+      }
+    }
+  }
+}
+
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf
new file mode 100644
index 0000000000..79ff16eb1a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/fake_to_obs_excel.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  ObsFile {
+    path="/seatunnel/excel"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format_type="excel"
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf
new file mode 100644
index 0000000000..4ae33021fc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_projection_to_assert.conf
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path="/seatunnel/excel"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    result_table_name = "fake"
+    file_format_type = excel
+    delimiter = ;
+    read_columns = [c_string, c_boolean]
+    skip_header_row_number = 1
+    schema = {
+      fields {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, string>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_bytes = bytes
+            c_date = date
+            c_decimal = "decimal(38, 18)"
+            c_timestamp = timestamp
+          }
+      }
+    }
+  }
+}
+
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf
new file mode 100644
index 0000000000..45144959b0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/excel/obs_excel_to_assert.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path="/seatunnel/excel"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    result_table_name = "fake"
+    file_format_type = excel
+    delimiter = ;
+    skip_header_row_number = 1
+    schema = {
+      fields {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+          c_row = {
+            c_map = "map<string, string>"
+            c_array = "array<int>"
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_bytes = bytes
+            c_date = date
+            c_decimal = "decimal(38, 18)"
+            c_timestamp = timestamp
+          }
+      }
+    }
+  }
+}
+
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf
new file mode 100644
index 0000000000..1cd92373f3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/fake_to_obs_file_json.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  ObsFile {
+    path = "/seatunnel/json"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "json"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf
new file mode 100644
index 0000000000..76f746bcb2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/json/obs_file_json_to_assert.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path = "/seatunnel/json"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "json"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf
new file mode 100644
index 0000000000..bb531a3c13
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/fake_to_obs_file_orc.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  ObsFile {
+    path = "/seatunnel/orc"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "orc"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    compress_codec = "zlib"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf
new file mode 100644
index 0000000000..b89bed9a49
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_projection_to_assert.conf
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path = "/seatunnel/orc"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "orc"
+    read_columns = [c_string, c_boolean, c_double]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf
new file mode 100644
index 0000000000..4d5ab63f5e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/orc/obs_file_orc_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path = "/seatunnel/orc"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "parquet"
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf
new file mode 100644
index 0000000000..bf696c2494
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/fake_to_obs_file_parquet.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  ObsFile {
+    path = "/seatunnel/parquet"
+    bucket = "obs://dc-for-test/seatunnel-test"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "parquet"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    compress_codec = "gzip"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf
new file mode 100644
index 0000000000..3ca1c80122
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_projection_to_assert.conf
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path = "/seatunnel/parquet"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "parquet"
+    read_columns = [c_string, c_boolean, c_double]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf
new file mode 100644
index 0000000000..67b0146efb
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/parquet/obs_file_parquet_to_assert.conf
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path = "/seatunnel/parquet"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "parquet"
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf
new file mode 100644
index 0000000000..4b78f77d47
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/fake_to_obs_file_text.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  ObsFile {
+    path = "/seatunnel/text"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    compress_codec = "lzo"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf
new file mode 100644
index 0000000000..09853ce067
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_projection_to_assert.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path = "/seatunnel/text"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "text"
+    read_columns = [c_string, c_boolean, c_double]
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf
new file mode 100644
index 0000000000..452fb79fd8
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_skip_headers.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path = "/seatunnel/text"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "text"
+    skip_header_row_number = 1
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 4
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf
new file mode 100644
index 0000000000..86742f67d4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-obs-e2e/src/test/resources/text/obs_file_text_to_assert.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  ObsFile {
+    path = "/seatunnel/text"
+    bucket = "obs://obs-bucket-name"
+    access_key = ""
+    access_secret = ""
+    endpoint = "obs.xxxxxx.myhuaweicloud.com"
+    file_format_type = "text"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 9e236b7439..35a002fc4b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -63,6 +63,7 @@
         <module>connector-maxcompute-e2e</module>
         <module>connector-google-firestore-e2e</module>
         <module>connector-rocketmq-e2e</module>
+        <module>connector-file-obs-e2e</module>
         <module>connector-file-ftp-e2e</module>
         <module>connector-pulsar-e2e</module>
         <module>connector-paimon-e2e</module>

Reply via email to