This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37999ea5bb [Feature][Connector-V2] Enable file split for S3File source 
(#10450)
37999ea5bb is described below

commit 37999ea5bbac070163b4c9f9e322781fa940a5ee
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Mar 11 17:14:45 2026 +0800

    [Feature][Connector-V2] Enable file split for S3File source (#10450)
---
 docs/en/connectors/source/S3File.md                |  28 ++++-
 docs/zh/connectors/source/S3File.md                |  27 +++++
 .../seatunnel/file/config/FileBaseOptions.java     |   4 +-
 .../file/source/reader/CsvReadStrategy.java        |  19 ++-
 .../file/source/reader/TextReadStrategy.java       |   5 +-
 .../split/AccordingToSplitSizeSplitStrategy.java   |   4 +-
 .../source/split/FileSplitStrategyFactory.java     |  34 +++++-
 .../source/split/ParquetFileSplitStrategy.java     |  15 ++-
 .../reader/ReadStrategySplitFallbackTest.java      | 132 +++++++++++++++++++++
 .../source/split/FileSplitStrategyFactoryTest.java |  88 ++++++++++++++
 .../seatunnel/file/s3/source/S3FileSource.java     |   6 +-
 .../file/s3/source/S3FileSourceFactory.java        |  12 ++
 .../seatunnel/file/s3/S3FileFactoryTest.java       |  71 +++++++++++
 .../e2e/connector/file/s3/S3FileWithFilterIT.java  |  22 ++++
 .../test/resources/text/e2e_split_with_header.txt  |   6 +
 .../text/s3_file_text_enable_split_to_assert.conf  |  79 ++++++++++++
 16 files changed, 532 insertions(+), 20 deletions(-)

diff --git a/docs/en/connectors/source/S3File.md 
b/docs/en/connectors/source/S3File.md
index 36b3a8590f..6446dc224a 100644
--- a/docs/en/connectors/source/S3File.md
+++ b/docs/en/connectors/source/S3File.md
@@ -215,6 +215,8 @@ If you assign file type to `parquet` `orc`, schema option 
not required, connecto
 | csv_use_header_line             | boolean | no       | false                 
                                | Whether to use the header line to parse the 
file, only used when the file_format is `csv` and the file contains the header 
line that match RFC 4180                                                        
                                                                                
                                                                                
                 [...]
 | compress_codec                  | string  | no       | none                  
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | archive_compress_codec          | string  | no       | none                  
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| enable_file_split               | boolean | no       | false                 
                                | Turn on logical file split to improve 
parallelism for huge files. Only supported for `text`/`csv`/`json`/`parquet` 
and non-compressed format.                                                      
                                                                                
                                                         |
+| file_split_size                 | long    | no       | 134217728             
                                | Split size in bytes when 
`enable_file_split=true`. For `text`/`csv`/`json`, the split end will be 
aligned to the next `row_delimiter`. For `parquet`, the split unit is RowGroup 
and will never break a RowGroup.                                                
                                                                           |
 | encoding                        | string  | no       | UTF-8                 
                                |                                               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | null_format                     | string  | no       | -                     
                                | Only used when file_format_type is text. 
null_format to define which strings can be represented as null. e.g: `\N`       
                                                                                
                                                                                
                                                                                
                   [...]
 | binary_chunk_size               | int     | no       | 1024                  
                                | Only used when file_format_type is binary. 
The chunk size (in bytes) for reading binary files. Default is 1024 bytes. 
Larger values may improve performance for large files but use more memory.      
                                                                                
                                                                                
                      [...]
@@ -311,6 +313,30 @@ The result of this example matching is:
 /data/seatunnel/20241005/old_data.csv
 ```
 
+### enable_file_split [boolean]
+
+Turn on the file splitting function, the default is false. It can be selected 
when the file type is csv, text, json, parquet and non-compressed format.
+
+- `text`/`csv`/`json`: split by `file_split_size` and align to the next 
`row_delimiter` to avoid breaking records.
+- `parquet`: split by RowGroup (logical split), never breaks a RowGroup.
+
+**Recommendations**
+- Enable when reading a few large files and you want higher read parallelism.
+- Disable when reading many small files, or when parallelism is low (splitting 
adds overhead).
+
+**Limitations**
+- Not supported for compressed files (`compress_codec` != `none`) or archive 
files (`archive_compress_codec` != `none`) — it will fall back to non-splitting 
and emit a warning log.
+- For `text`/`csv`/`json`, actual split size may be larger than 
`file_split_size` because the split end is aligned to the next `row_delimiter`.
+- For `json`, splitting is only supported for JSON Lines (one JSON object per 
line).
+- When splitting is enabled, global record order is not guaranteed because 
splits can be processed in parallel. Set `parallelism=1` if strict ordering is 
required.
+
+### file_split_size [long]
+
+File split size, which can be filled in when the enable_file_split parameter 
is true. The unit is the number of bytes. The default value is the number of 
bytes of 128MB, which is 134217728.
+
+**Tuning**
+- Start with the default (128MB). Decrease it if parallelism is 
under-utilized; increase it if the number of splits is too large.
+
 ### compress_codec [string]
 
 The compress codec of files and the details that supported as the following 
shown:
@@ -504,4 +530,4 @@ sink {
 
 ## Changelog
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/zh/connectors/source/S3File.md 
b/docs/zh/connectors/source/S3File.md
index 4c20504f1c..75de5ef656 100644
--- a/docs/zh/connectors/source/S3File.md
+++ b/docs/zh/connectors/source/S3File.md
@@ -214,6 +214,8 @@ schema {
 | xml_use_attr_format             | boolean | 否    | -                         
                            | 指定是否使用标签属性格式处理数据,仅对XML文件有效。                       
                                                                                
                                                                                
                                                                                
                    |
 | compress_codec                  | string  | 否    | none                      
                            |                                                   
                                                                                
                                                                                
                                                                                
                    |
 | archive_compress_codec          | string  | 否    | none                      
                            |                                                   
                                                                                
                                                                                
                                                                                
                    |
+| enable_file_split               | boolean | 否    | false                     
                            | 开启大文件拆分以提升并行度。仅支持 `text`/`csv`/`json`/`parquet` 
且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。                  
                                                               |
+| file_split_size                 | long    | 否    | 134217728                 
                            | `enable_file_split=true` 
时生效,单位字节。`text`/`csv`/`json` 按 `file_split_size` 拆分并对齐到下一个 
`row_delimiter`;`parquet` 以 RowGroup 为拆分单位,不会切开 RowGroup。                       
                         |
 | encoding                        | string  | 否    | UTF-8                     
                            |                                                   
                                                                                
                                                                                
                                                                                
                    |
 | null_format                     | string  | 否    | -                         
                            | 
仅在file_format_type为text时使用。null_format用于定义哪些字符串可以表示为null。例如:`\N`                
                                                                                
                                                                                
                                                                      |
 | binary_chunk_size               | int     | 否    | 1024                      
                            | 
仅在file_format_type为binary时使用。读取二进制文件的块大小(以字节为单位)。默认为1024字节。较大的值可能会提高大文件的性能,但会使用更多内存。
                                                                                
                                                                                
                                                                  |
@@ -298,6 +300,31 @@ abc.*
 /data/seatunnel/20241005/old_data.csv
 ```
 
+### enable_file_split [boolean]
+
+开启大文件拆分功能,默认 false。仅支持 `csv`/`text`/`json`/`parquet` 
且非压缩格式(`compress_codec=none` 且 `archive_compress_codec=none`)。
+
+- `text`/`csv`/`json`:按 `file_split_size` 拆分并对齐到下一个 
`row_delimiter`,避免切开一行/一条记录。
+- `parquet`:以 RowGroup 为逻辑拆分单位,不会切开 RowGroup。
+
+**使用建议**
+- 适合:读取少量大文件,并希望通过更高并行度提升吞吐。
+- 不建议:读取大量小文件,或并行度较低的场景(拆分会带来额外的枚举/调度开销)。
+
+**限制说明**
+- 不支持压缩文件(`compress_codec` != `none`)或归档文件(`archive_compress_codec` != 
`none`),会自动回退为不拆分,并打印 WARN 日志提示。
+- 对于 `text`/`csv`/`json`,实际 split 的大小可能略大于 `file_split_size`(因为需要对齐到下一个 
`row_delimiter`)。
+- 对于 `json`,仅支持 JSON Lines(每行一个 JSON 对象)的切分读取。
+- 启用切分后,数据全局顺序不保证(split 可能并行处理导致输出顺序交错)。如需严格有序,请设置 `parallelism=1` 或关闭切分。
+
+### file_split_size [long]
+
+`enable_file_split=true` 时生效,单位字节。默认 128MB(134217728)。
+
+**调优建议**
+- 建议从默认值(128MB)开始:如果并行度未充分利用可适当调小;如果 split 数量过多可适当调大。
+- 经验公式:`file_split_size ≈ file_size / 期望并行度`。
+
 ### compress_codec [string]
 
 文件的压缩编解码器,支持的详细信息如下所示:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
index b88b64b31d..b026b92c2e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileBaseOptions.java
@@ -162,5 +162,7 @@ public class FileBaseOptions extends ConnectorCommonOptions 
{
                     .longType()
                     .defaultValue(128 * 1024 * 1024L)
                     .withDescription(
-                            "File split size, which can be filled in when the 
enable_file_split parameter is true. The unit is the number of bytes. The 
default value is the number of bytes of 128MB, which is 128*1024*1024.");
+                            "File split size in bytes when 
enable_file_split=true. Must be greater than 0. "
+                                    + "For text-like formats, the split end 
will be aligned to the next row_delimiter. "
+                                    + "Default is 128MB (128*1024*1024).");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index 665c2670ed..a049d9fa55 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -106,13 +106,14 @@ public class CsvReadStrategy extends AbstractReadStrategy 
{
                 currentFileName,
                 split.getStart(),
                 split.getLength());
+        final boolean useSplitRead = isSplitReadEnabled(split);
         try (BOMInputStream bomIn = new 
BOMInputStream(wrapInputStream(inputStream, split));
                 BufferedReader reader =
                         new BufferedReader(new InputStreamReader(bomIn, 
getCharset(bomIn)));
                 CSVParser csvParser = new CSVParser(reader, 
getCSVFormat(split))) {
             // skip lines
-            // if enableSplitFile is true,no need to skip
-            if (!enableSplitFile) {
+            // if split range is used, no need to skip
+            if (!useSplitRead) {
                 for (int i = 0; i < skipHeaderNumber; i++) {
                     if (reader.readLine() == null) {
                         throw new IOException(
@@ -197,7 +198,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
                 break;
         }
         // rebuild inputStream
-        if (enableSplitFile && split.getLength() > -1) {
+        if (isSplitReadEnabled(split)) {
             resultStream = safeSlice(resultStream, split.getStart(), 
split.getLength());
         }
         return resultStream;
@@ -209,6 +210,10 @@ public class CsvReadStrategy extends AbstractReadStrategy {
                 : Charset.forName(bomIn.getBOM().getCharsetName());
     }
 
+    private boolean isSplitReadEnabled(FileSourceSplit split) {
+        return enableSplitFile && split.getLength() > -1;
+    }
+
     private CSVFormat getCSVFormat(FileSourceSplit split) {
         String quoteChar = 
readonlyConfig.get(FileBaseSourceOptions.QUOTE_CHAR);
         String escapeChar = 
readonlyConfig.get(FileBaseSourceOptions.ESCAPE_CHAR);
@@ -221,8 +226,9 @@ public class CsvReadStrategy extends AbstractReadStrategy {
             builder.setEscape(escapeChar.charAt(0));
         }
         CSVFormat csvFormat = builder.build();
-        // if enableSplitFile is true,no need to skip
-        if (firstLineAsHeader && (!enableSplitFile || split.getStart() == 0)) {
+        final boolean useSplitRead = isSplitReadEnabled(split);
+        // if split range is used, header should only be read in the first 
split
+        if (firstLineAsHeader && (!useSplitRead || split.getStart() == 0)) {
             csvFormat = csvFormat.withFirstRecordAsHeader();
         }
         return csvFormat;
@@ -230,7 +236,8 @@ public class CsvReadStrategy extends AbstractReadStrategy {
 
     private List<String> getHeaders(CSVParser csvParser, FileSourceSplit 
split) {
         List<String> headers;
-        if (firstLineAsHeader && (!enableSplitFile || split.getStart() == 0)) {
+        final boolean useSplitRead = isSplitReadEnabled(split);
+        if (firstLineAsHeader && (!useSplitRead || split.getStart() == 0)) {
             headers = new ArrayList<>(csvParser.getHeaderNames());
         } else {
             headers =
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 074054c7e8..fa45521fbb 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -205,7 +205,8 @@ public class TextReadStrategy extends AbstractReadStrategy {
                 break;
         }
         // rebuild inputStream
-        if (enableSplitFile && split.getLength() > -1) {
+        final boolean useSplitRead = enableSplitFile && split.getLength() > -1;
+        if (useSplitRead) {
             actualInputStream = safeSlice(inputStream, split.getStart(), 
split.getLength());
         }
         try (BufferedReader reader =
@@ -220,7 +221,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
                         }
                     };
             StreamLineSplitter splitter;
-            if (enableSplitFile) {
+            if (useSplitRead) {
                 splitter = new StreamLineSplitter(rowDelimiter, 0, 
lineProcessor);
             } else {
                 splitter = new StreamLineSplitter(rowDelimiter, 
skipHeaderNumber, lineProcessor);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
index ca9a60e979..1dafa51b7e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/AccordingToSplitSizeSplitStrategy.java
@@ -65,7 +65,9 @@ public class AccordingToSplitSizeSplitStrategy implements 
FileSplitStrategy, Clo
         if (splitSize <= 0) {
             throw new SeaTunnelRuntimeException(
                     FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
-                    "SplitSizeBytes must be greater than 0");
+                    String.format(
+                            "file_split_size must be greater than 0 when 
enable_file_split=true, but got: %d",
+                            splitSize));
         }
         if (rowDelimiter == null || rowDelimiter.isEmpty()) {
             throw new SeaTunnelRuntimeException(
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
index a801927e9d..c0d6bb42e5 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactory.java
@@ -17,16 +17,21 @@
 package org.apache.seatunnel.connectors.seatunnel.file.source.split;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.Objects;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions.DEFAULT_ROW_DELIMITER;
 
+@Slf4j
 public class FileSplitStrategyFactory {
 
     public static FileSplitStrategy initFileSplitStrategy(
@@ -34,12 +39,24 @@ public class FileSplitStrategyFactory {
         if (!readonlyConfig.get(FileBaseSourceOptions.ENABLE_FILE_SPLIT)) {
             return new DefaultFileSplitStrategy();
         }
-        if 
(!readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE).supportFileSplit())
 {
+        FileFormat fileFormat = 
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE);
+        if (!fileFormat.supportFileSplit()) {
+            log.warn(
+                    "enable_file_split=true but file_format_type={} does not 
support file split. "
+                            + "Falling back to non-splitting mode.",
+                    fileFormat);
             return new DefaultFileSplitStrategy();
         }
-        if (readonlyConfig.get(FileBaseSourceOptions.COMPRESS_CODEC) != 
CompressFormat.NONE
-                || 
readonlyConfig.get(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC)
-                        != ArchiveCompressFormat.NONE) {
+        CompressFormat compressCodec = 
readonlyConfig.get(FileBaseSourceOptions.COMPRESS_CODEC);
+        ArchiveCompressFormat archiveCompressCodec =
+                
readonlyConfig.get(FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC);
+        if (compressCodec != CompressFormat.NONE
+                || archiveCompressCodec != ArchiveCompressFormat.NONE) {
+            log.warn(
+                    "enable_file_split=true but compress_codec={} or 
archive_compress_codec={} is not NONE. "
+                            + "Falling back to non-splitting mode.",
+                    compressCodec,
+                    archiveCompressCodec);
             return new DefaultFileSplitStrategy();
         }
 
@@ -47,7 +64,14 @@ public class FileSplitStrategyFactory {
                 hadoopConf, "hadoopConf must not be null when file split is 
enabled");
 
         long fileSplitSize = 
readonlyConfig.get(FileBaseSourceOptions.FILE_SPLIT_SIZE);
-        if (FileFormat.PARQUET == 
readonlyConfig.get(FileBaseSourceOptions.FILE_FORMAT_TYPE)) {
+        if (fileSplitSize <= 0) {
+            throw new SeaTunnelRuntimeException(
+                    FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
+                    String.format(
+                            "file_split_size must be greater than 0 when 
enable_file_split=true, but got: %d",
+                            fileSplitSize));
+        }
+        if (FileFormat.PARQUET == fileFormat) {
             return new ParquetFileSplitStrategy(fileSplitSize, hadoopConf);
         }
         String rowDelimiter =
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
index 93a12d3213..b89bddfa53 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/ParquetFileSplitStrategy.java
@@ -58,7 +58,9 @@ public class ParquetFileSplitStrategy implements 
FileSplitStrategy, Closeable {
         if (splitSizeBytes <= 0) {
             throw new SeaTunnelRuntimeException(
                     FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
-                    "SplitSizeBytes must be greater than 0");
+                    String.format(
+                            "file_split_size must be greater than 0 when 
enable_file_split=true, but got: %d",
+                            splitSizeBytes));
         }
         this.splitSizeBytes = splitSizeBytes;
         this.hadoopFileSystemProxy = null;
@@ -68,7 +70,9 @@ public class ParquetFileSplitStrategy implements 
FileSplitStrategy, Closeable {
         if (splitSizeBytes <= 0) {
             throw new SeaTunnelRuntimeException(
                     FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL,
-                    "SplitSizeBytes must be greater than 0");
+                    String.format(
+                            "file_split_size must be greater than 0 when 
enable_file_split=true, but got: %d",
+                            splitSizeBytes));
         }
         this.splitSizeBytes = splitSizeBytes;
         this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
@@ -79,7 +83,12 @@ public class ParquetFileSplitStrategy implements 
FileSplitStrategy, Closeable {
         try {
             return splitByRowGroups(tableId, filePath, 
readRowGroups(filePath));
         } catch (IOException e) {
-            throw new 
SeaTunnelRuntimeException(FileConnectorErrorCode.FILE_SPLIT_FAIL, e);
+            throw new SeaTunnelRuntimeException(
+                    FileConnectorErrorCode.FILE_SPLIT_FAIL,
+                    String.format(
+                            "Split parquet file for [%s] failed, cause=%s: %s",
+                            filePath, e.getClass().getSimpleName(), 
e.getMessage()),
+                    e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategySplitFallbackTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategySplitFallbackTest.java
new file mode 100644
index 0000000000..c791442f4a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategySplitFallbackTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ReadStrategySplitFallbackTest {
+
+    private static final class ListCollector implements 
Collector<SeaTunnelRow> {
+        private final List<SeaTunnelRow> rows;
+        private final Object checkpointLock = new Object();
+
+        private ListCollector(List<SeaTunnelRow> rows) {
+            this.rows = rows;
+        }
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            rows.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return checkpointLock;
+        }
+    }
+
+    @Test
+    void 
testTextReadStrategyShouldSkipHeaderWhenEnableSplitButNoRangeInSplit() throws 
Exception {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FileBaseSourceOptions.FILE_PATH.key(), "/tmp/test");
+        configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        configMap.put(FileBaseSourceOptions.SKIP_HEADER_ROW_NUMBER.key(), 1L);
+        Config pluginConfig = ConfigFactory.parseMap(configMap);
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"name"}, new SeaTunnelDataType[] 
{BasicType.STRING_TYPE});
+        CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("test", 
rowType);
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        ListCollector collector = new ListCollector(rows);
+        FileSourceSplit split = new FileSourceSplit("test", 
"/tmp/test/e2e.txt");
+
+        try (TextReadStrategy strategy = new TextReadStrategy()) {
+            strategy.setPluginConfig(pluginConfig);
+            strategy.setCatalogTable(catalogTable);
+
+            strategy.readProcess(
+                    split,
+                    collector,
+                    new 
ByteArrayInputStream("name\na\n".getBytes(StandardCharsets.UTF_8)),
+                    Collections.emptyMap(),
+                    "e2e.txt");
+        }
+
+        Assertions.assertEquals(1, rows.size());
+        Assertions.assertEquals("a", rows.get(0).getField(0));
+    }
+
+    @Test
+    void testCsvReadStrategyShouldUseHeaderWhenEnableSplitButNoRangeInSplit() 
throws Exception {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FileBaseSourceOptions.FILE_PATH.key(), "/tmp/test");
+        configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        configMap.put(FileBaseSourceOptions.CSV_USE_HEADER_LINE.key(), true);
+        Config pluginConfig = ConfigFactory.parseMap(configMap);
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name"},
+                        new SeaTunnelDataType[] {BasicType.INT_TYPE, 
BasicType.STRING_TYPE});
+        CatalogTable catalogTable = CatalogTableUtil.getCatalogTable("test", 
rowType);
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        ListCollector collector = new ListCollector(rows);
+        FileSourceSplit split = new FileSourceSplit("test", 
"/tmp/test/e2e.csv");
+
+        try (CsvReadStrategy strategy = new CsvReadStrategy()) {
+            strategy.setPluginConfig(pluginConfig);
+            strategy.setCatalogTable(catalogTable);
+
+            strategy.readProcess(
+                    split,
+                    collector,
+                    new 
ByteArrayInputStream("id,name\n1,a\n".getBytes(StandardCharsets.UTF_8)),
+                    Collections.emptyMap(),
+                    "e2e.csv");
+        }
+
+        Assertions.assertEquals(1, rows.size());
+        Assertions.assertEquals(1, rows.get(0).getField(0));
+        Assertions.assertEquals("a", rows.get(0).getField(1));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactoryTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactoryTest.java
new file mode 100644
index 0000000000..7fed69435a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSplitStrategyFactoryTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class FileSplitStrategyFactoryTest {
+
+    @Test
+    void shouldThrowWhenSplitSizeIsNonPositive() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        configMap.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.TEXT);
+        configMap.put(FileBaseSourceOptions.COMPRESS_CODEC.key(), 
CompressFormat.NONE);
+        configMap.put(
+                FileBaseSourceOptions.ARCHIVE_COMPRESS_CODEC.key(), 
ArchiveCompressFormat.NONE);
+        configMap.put(FileBaseSourceOptions.FILE_SPLIT_SIZE.key(), 0L);
+
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+        HadoopConf hadoopConf = new HadoopConf("file:///");
+
+        SeaTunnelRuntimeException exception =
+                Assertions.assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () ->
+                                FileSplitStrategyFactory.initFileSplitStrategy(
+                                        readonlyConfig, hadoopConf));
+        Assertions.assertEquals(
+                FileConnectorErrorCode.FILE_SPLIT_SIZE_ILLEGAL, 
exception.getSeaTunnelErrorCode());
+        
Assertions.assertTrue(exception.getMessage().contains("file_split_size"));
+    }
+
+    @Test
+    void shouldFallbackToDefaultWhenCompressed() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        configMap.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.TEXT);
+        configMap.put(FileBaseSourceOptions.COMPRESS_CODEC.key(), 
CompressFormat.LZO);
+        configMap.put(FileBaseSourceOptions.FILE_SPLIT_SIZE.key(), 0L);
+
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+
+        FileSplitStrategy strategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig, 
null);
+        Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, strategy);
+    }
+
+    @Test
+    void shouldFallbackToDefaultWhenFormatNotSupportSplit() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(FileBaseSourceOptions.ENABLE_FILE_SPLIT.key(), true);
+        configMap.put(FileBaseSourceOptions.FILE_FORMAT_TYPE.key(), 
FileFormat.ORC);
+        configMap.put(FileBaseSourceOptions.FILE_SPLIT_SIZE.key(), 0L);
+
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+
+        FileSplitStrategy strategy =
+                FileSplitStrategyFactory.initFileSplitStrategy(readonlyConfig, 
null);
+        Assertions.assertInstanceOf(DefaultFileSplitStrategy.class, strategy);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index 8663d95dfd..4e64c0c0b7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -28,7 +28,11 @@ import java.util.List;
 public class S3FileSource extends BaseMultipleTableFileSource {
 
     public S3FileSource(ReadonlyConfig readonlyConfig, List<CatalogTable> 
catalogTablesFromConfig) {
-        super(new MultipleTableS3FileSourceConfig(readonlyConfig, 
catalogTablesFromConfig));
+        this(new MultipleTableS3FileSourceConfig(readonlyConfig, 
catalogTablesFromConfig));
+    }
+
+    private S3FileSource(MultipleTableS3FileSourceConfig sourceConfig) {
+        super(sourceConfig, initFileSplitStrategy(sourceConfig));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index 66af2fd81d..303ec74d39 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -93,6 +93,18 @@ public class S3FileSourceFactory implements 
TableSourceFactory {
                         Arrays.asList(
                                 FileFormat.TEXT, FileFormat.JSON, 
FileFormat.CSV, FileFormat.XML),
                         FileBaseSourceOptions.ENCODING)
+                .conditional(
+                        FileBaseSourceOptions.FILE_FORMAT_TYPE,
+                        Arrays.asList(
+                                FileFormat.TEXT,
+                                FileFormat.JSON,
+                                FileFormat.CSV,
+                                FileFormat.PARQUET),
+                        FileBaseSourceOptions.ENABLE_FILE_SPLIT)
+                .conditional(
+                        FileBaseSourceOptions.ENABLE_FILE_SPLIT,
+                        Boolean.TRUE,
+                        FileBaseSourceOptions.FILE_SPLIT_SIZE)
                 .optional(FileBaseSourceOptions.PARSE_PARTITION_FROM_PATH)
                 .optional(FileBaseSourceOptions.DATE_FORMAT_LEGACY)
                 .optional(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY)
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/s3/S3FileFactoryTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/s3/S3FileFactoryTest.java
index f52a16e264..393f8dc7c3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/s3/S3FileFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/s3/S3FileFactoryTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.s3;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.Condition;
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.s3.sink.S3FileSinkFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.s3.source.S3FileSourceFactory;
 
@@ -30,4 +36,69 @@ class S3FileFactoryTest {
         Assertions.assertNotNull((new S3FileSourceFactory()).optionRule());
         Assertions.assertNotNull((new S3FileSinkFactory()).optionRule());
     }
+
+    @Test
+    void sourceOptionRuleShouldContainFileSplitOptions() {
+        OptionRule rule = new S3FileSourceFactory().optionRule();
+        Assertions.assertTrue(
+                optionRuleContains(rule, 
FileBaseSourceOptions.ENABLE_FILE_SPLIT),
+                "S3File source optionRule should include enable_file_split");
+        Assertions.assertTrue(
+                optionRuleContains(rule, 
FileBaseSourceOptions.FILE_SPLIT_SIZE),
+                "S3File source optionRule should include file_split_size");
+
+        Assertions.assertTrue(
+                hasConditionalRequiredOption(
+                        rule,
+                        FileBaseSourceOptions.FILE_FORMAT_TYPE,
+                        FileBaseSourceOptions.ENABLE_FILE_SPLIT),
+                "S3File source optionRule should expose enable_file_split for 
split-capable formats");
+
+        Assertions.assertTrue(
+                hasConditionalRequiredOption(
+                        rule,
+                        FileBaseSourceOptions.ENABLE_FILE_SPLIT,
+                        FileBaseSourceOptions.FILE_SPLIT_SIZE),
+                "S3File source optionRule should expose file_split_size when 
enable_file_split=true");
+    }
+
+    private static boolean optionRuleContains(OptionRule rule, Option<?> 
option) {
+        if (rule.getOptionalOptions().contains(option)) {
+            return true;
+        }
+        return rule.getRequiredOptions().stream().anyMatch(ro -> 
ro.getOptions().contains(option));
+    }
+
+    private static boolean hasConditionalRequiredOption(
+            OptionRule rule, Option<?> conditionalOption, Option<?> 
requiredOption) {
+        return rule.getRequiredOptions().stream()
+                .filter(ro -> ro instanceof 
RequiredOption.ConditionalRequiredOptions)
+                .map(ro -> (RequiredOption.ConditionalRequiredOptions) ro)
+                .anyMatch(
+                        cro ->
+                                expressionContainsOption(cro.getExpression(), 
conditionalOption)
+                                        && 
cro.getRequiredOption().contains(requiredOption));
+    }
+
+    private static boolean expressionContainsOption(Expression expression, 
Option<?> option) {
+        Expression currentExpression = expression;
+        while (currentExpression != null) {
+            if (conditionContainsOption(currentExpression.getCondition(), 
option)) {
+                return true;
+            }
+            currentExpression = currentExpression.getNext();
+        }
+        return false;
+    }
+
+    private static boolean conditionContainsOption(Condition<?> condition, 
Option<?> option) {
+        Condition<?> currentCondition = condition;
+        while (currentCondition != null) {
+            if (currentCondition.getOption().equals(option)) {
+                return true;
+            }
+            currentCondition = currentCondition.getNext();
+        }
+        return false;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
index 546baf06a3..5f5e587aa6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/s3/S3FileWithFilterIT.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.e2e.connector.file.s3;
 import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -38,6 +39,14 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.nio.file.Paths;
 
+/**
+ * MinIO-based S3 E2E test suite for connector-file-s3, covering:
+ *
+ * <ul>
+ *   <li>file filter by path/name pattern
+ *   <li>logical file split (enable_file_split/file_split_size) for parallel 
read
+ * </ul>
+ */
 @Slf4j
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class S3FileWithFilterIT extends SeaTunnelContainer {
@@ -79,7 +88,9 @@ public class S3FileWithFilterIT extends SeaTunnelContainer {
     }
 
     @Override
+    @AfterAll
     public void tearDown() throws Exception {
+        super.tearDown();
         if (s3Container != null) {
             s3Container.close();
         }
@@ -143,4 +154,15 @@ public class S3FileWithFilterIT extends SeaTunnelContainer 
{
                 executeJob("/json/s3_to_access_for_json_name_filter.conf");
         Assertions.assertEquals(0, execNameResult.getExitCode());
     }
+
+    @Test
+    public void testS3FileTextEnableSplitToAssert() throws IOException, 
InterruptedException {
+        S3Utils.uploadTestFiles(
+                "/text/e2e_split_with_header.txt",
+                "/test/seatunnel/read/split/text/e2e_split_with_header.txt",
+                true);
+        Container.ExecResult execResult =
+                executeJob("/text/s3_file_text_enable_split_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/e2e_split_with_header.txt
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/e2e_split_with_header.txt
new file mode 100644
index 0000000000..6c9c22a326
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/e2e_split_with_header.txt
@@ -0,0 +1,6 @@
+name
+a
+b
+c
+d
+e
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/s3_file_text_enable_split_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/s3_file_text_enable_split_to_assert.conf
new file mode 100644
index 0000000000..5842447fb7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-s3-e2e/src/test/resources/text/s3_file_text_enable_split_to_assert.conf
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = "local"
+}
+
+source {
+  S3File {
+    fs.s3a.endpoint = "http://s3:9000";
+    hadoop_s3_properties = {
+      "fs.s3a.path.style.access" = "true"
+      "fs.s3a.statistics.enable" = "false"
+    }
+    fs.s3a.aws.credentials.provider = 
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
+    access_key = "minioadmin"
+    secret_key = "minioadmin"
+    bucket = "s3a://ws-package"
+    path = "/test/seatunnel/read/split/text"
+    file_format_type = "text"
+
+    enable_file_split = true
+    file_split_size = 5
+
+    skip_header_row_number = 1
+    schema = {
+      fields {
+        name = string
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules = {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 5
+        }
+      ]
+      field_rules = [
+        {
+          field_name = "name"
+          field_type = "string"
+          field_value = [
+            { rule_type = NOT_NULL }
+          ]
+        }
+      ]
+    }
+  }
+}

Reply via email to