This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dbe41e74cd [Feature][Jdbc] Add String type column split Support by 
charset-based splitting algorithm (#9002)
dbe41e74cd is described below

commit dbe41e74cd382e3843a8858ceb1e78baa0a99f31
Author: chenhongyu <2795267...@qq.com>
AuthorDate: Wed Mar 26 10:55:44 2025 +0800

    [Feature][Jdbc] Add String type column split Support by charset-based 
splitting algorithm (#9002)
    
    Co-authored-by: chenhongyu05 <chenhongy...@corp.netease.com>
---
 docs/en/connector-v2/source/Jdbc.md                |   8 +-
 .../seatunnel/jdbc/config/JdbcOptions.java         |  24 +-
 .../seatunnel/jdbc/config/JdbcSourceConfig.java    |   8 +-
 .../jdbc/config/JdbcSourceTableConfig.java         |   5 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |  40 ++++
 .../jdbc/internal/dialect/db2/DB2Dialect.java      |   5 +
 .../internal/dialect/oracle/OracleDialect.java     |  20 ++
 .../internal/dialect/vertica/VerticaDialect.java   |  20 ++
 .../seatunnel/jdbc/source/ChunkSplitter.java       |  10 +
 .../jdbc/source/CollationBasedSplitter.java        | 196 ++++++++++++++++
 .../jdbc/source/DynamicChunkSplitter.java          | 251 ++++++++++++++++++---
 .../seatunnel/jdbc/source/FixedChunkSplitter.java  | 123 +++++++++-
 .../seatunnel/jdbc/source/JdbcSourceTable.java     |   5 +-
 .../{JdbcSourceTable.java => StringSplitMode.java} |  46 ++--
 .../jdbc/source/CharsetBasedSplitterTest.java      | 189 ++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcGreenplumIT.java |   1 +
 .../resources/jdbc_greenplum_source_and_sink.conf  |   2 +
 .../seatunnel/jdbc/JdbcMysqlSplitIT.java           | 159 +++++++++++++
 18 files changed, 1035 insertions(+), 77 deletions(-)

diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 1505c4895e..bc5e39e0a8 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -53,8 +53,8 @@ supports query SQL and can achieve projection effect.
 | partition_column                           | String  | No       | -          
     | The column name for split data.                                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | partition_upper_bound                      | Long    | No       | -          
     | The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.                                                         
                                                                                
                                                                                
                                                                                
              [...]
 | partition_lower_bound                      | Long    | No       | -          
     | The partition_column min value for scan, if not set SeaTunnel will query 
database get min value.                                                         
                                                                                
                                                                                
                                                                                
              [...]
-| partition_num                              | Int     | No       | job 
parallelism | Not recommended for use, The correct approach is to control the 
number of split through `split.size`<br/> **Note:** This parameter takes effect 
only when using the `query` parameter. It does not take effect when using the 
`table_path` parameter.                                                         
                                                                                
                         [...]
-| decimal_type_narrowing                     | Boolean | No       | true       
     | Decimal type narrowing, if true, the decimal type will be narrowed to 
the int or long type if without loss of precision. Only support for Oracle at 
now. Please refer to `decimal_type_narrowing` below                             
                                                                                
                                                                                
                   [...]
+| partition_num                              | Int     | No       | job 
parallelism | Not recommended for use, The correct approach is to control the 
number of split through `split.size`<br/> **Note:** This parameter takes effect 
only when using the `query` parameter. It does not take effect when using the 
`table_path` parameter.                                                         
                                                                                
                         [...]
+| decimal_type_narrowing                     | Boolean | No       | true       
     | Decimal type narrowing, if true, the decimal type will be narrowed to 
the int or long type if without loss of precision. Only support for Oracle at 
now. Please refer to `decimal_type_narrowing` below                             
                                                                                
                                                                                
                   [...]
 | use_select_count                           | Boolean | No       | false      
     | Use select count for table count rather then other methods in dynamic 
chunk split stage. This is currently only available for jdbc-oracle.In this 
scenario, select count directly is used when it is faster to update statistics 
using sql from analysis table                                                   
                                                                                
                      [...]
 | skip_analyze                               | Boolean | No       | false      
     | Skip the analysis of table count in dynamic chunk split stage. This is 
currently only available for jdbc-oracle.In this scenario, you schedule 
analysis table sql to update related table statistics periodically or your 
table data does not change frequently                                           
                                                                                
                             [...]
 | fetch_size                                 | Int     | No       | 0          
     | For queries that return a large number of objects, you can configure the 
row fetch size used in the query to improve performance by reducing the number 
database hits required to satisfy the selection criteria. Zero means use jdbc 
default value.                                                                  
                                                                                
                 [...]
@@ -62,12 +62,14 @@ supports query SQL and can achieve projection effect.
 | table_path                                 | String  | No       | -          
     | The path to the full path of table, you can use this configuration 
instead of `query`. <br/>examples: <br/>`- mysql: "testdb.table1" `<br/>`- 
oracle: "test_schema.table1" `<br/>`- sqlserver: "testdb.test_schema.table1"` 
<br/>`- postgresql: "testdb.test_schema.table1"`  <br/>`- iris: 
"test_schema.table1"`                                                           
                                           [...]
 | table_list                                 | Array   | No       | -          
     | The list of tables to be read, you can use this configuration instead of 
`table_path`                                                                    
                                                                                
                                                                                
                                                                                
              [...]
 | where_condition                            | String  | No       | -          
     | Common row filter conditions for all tables/queries, must start with 
`where`. for example `where id > 100`                                           
                                                                                
                                                                                
                                                                                
                  [...]
-| split.size                                 | Int     | No       | 8096       
     | How many rows in one split, captured tables are split into multiple 
splits when read of table. **Note**: This parameter takes effect only when 
using the `table_path` parameter. It does not take effect when using the 
`query` parameter.                                                              
                                                                                
                               [...]
+| split.size                                 | Int     | No       | 8096       
     | How many rows in one split, captured tables are split into multiple 
splits when read of table. **Note**: This parameter takes effect only when 
using the `table_path` parameter. It does not take effect when using the 
`query` parameter.                                                              
                                                                                
                               [...]
 | split.even-distribution.factor.lower-bound | Double  | No       | 0.05       
     | Not recommended for use.<br/> The lower bound of the chunk key 
distribution factor. This factor is used to determine whether the table data is 
evenly distributed. If the distribution factor is calculated to be greater than 
or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the 
table chunks would be optimized for even distribution. Otherwise, if the 
distribution factor is less, the [...]
 | split.even-distribution.factor.upper-bound | Double  | No       | 100        
     | Not recommended for use.<br/> The upper bound of the chunk key 
distribution factor. This factor is used to determine whether the table data is 
evenly distributed. If the distribution factor is calculated to be less than or 
equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the 
table chunks would be optimized for even distribution. Otherwise, if the 
distribution factor is greater, the [...]
 | split.sample-sharding.threshold            | Int     | No       | 1000       
     | This configuration specifies the threshold of estimated shard count to 
trigger the sample sharding strategy. When the distribution factor is outside 
the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and 
`chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count 
(calculated as approximate row count / chunk size) exceeds this threshold, the 
sample sharding strat [...]
 | split.inverse-sampling.rate                | Int     | No       | 1000       
     | The inverse of the sampling rate used in the sample sharding strategy. 
For example, if this value is set to 1000, it means a 1/1000 sampling rate is 
applied during the sampling process. This option provides flexibility in 
controlling the granularity of the sampling, thus affecting the final number of 
shards. It's especially useful when dealing with very large datasets where a 
lower sampling rate is pref [...]
 | common-options                             |         | No       | -          
     | Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details.                              
                                                                                
                                                                                
                                                                                
                       [...]
+| split.string_split_mode                    | String  | No       | sample     
     | Supports different string splitting algorithms. By default, `sample` is 
used to determine the split by sampling the string value. You can switch to 
`charset_based` to enable charset-based string splitting algorithm. When set to 
`charset_based`, the algorithm assumes characters of partition_column are 
within ASCII range 32-126, which covers most character-based splitting 
scenarios.                        [...]
+| split.string_split_mode_collate            | String  | No       | -          
     | Specifies the collation to use when string_split_mode is set to 
`charset_based` and the table has a special collation. If not specified, the 
database's default collation will be used.                                      
                                                                                
                                                                                
                          [...]
 
 ### decimal_type_narrowing
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 976650456b..ce1ac866cb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -22,8 +22,8 @@ import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode;
 
-import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 
@@ -171,14 +171,14 @@ public interface JdbcOptions {
                     .noDefaultValue()
                     .withDescription("partition column");
 
-    Option<BigDecimal> PARTITION_UPPER_BOUND =
+    Option<String> PARTITION_UPPER_BOUND =
             Options.key("partition_upper_bound")
-                    .bigDecimalType()
+                    .stringType()
                     .noDefaultValue()
                     .withDescription("partition upper bound");
-    Option<BigDecimal> PARTITION_LOWER_BOUND =
+    Option<String> PARTITION_LOWER_BOUND =
             Options.key("partition_lower_bound")
-                    .bigDecimalType()
+                    .stringType()
                     .noDefaultValue()
                     .withDescription("partition lower bound");
     Option<Integer> PARTITION_NUM =
@@ -225,4 +225,18 @@ public interface JdbcOptions {
                     .mapType()
                     .noDefaultValue()
                     .withDescription("additional connection configuration 
parameters");
+
+    Option<StringSplitMode> STRING_SPLIT_MODE =
+            Options.key("split.string_split_mode")
+                    .enumType(StringSplitMode.class)
+                    .defaultValue(StringSplitMode.SAMPLE)
+                    .withDescription(
+                            "Supports different string splitting algorithms. 
By default, `sample` is used to determine the split by sampling the string 
value. You can switch to `charset_based` to enable charset-based string 
splitting algorithm. When set to `charset_based`, the algorithm assumes 
characters of partition_column are within ASCII range 32-126, which covers most 
character-based splitting scenarios.");
+
+    Option<String> STRING_SPLIT_MODE_COLLATE =
+            Options.key("split.string_split_mode_collate")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specifies the collation to use when 
string_split_mode is set to `charset_based` and the table has a special 
collation. If not specified, the database's default collation will be used.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
index 09cc92d70e..80d479d610 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode;
 
 import lombok.Builder;
 import lombok.Data;
@@ -44,6 +45,10 @@ public class JdbcSourceConfig implements Serializable {
     private int splitInverseSamplingRate;
     private boolean decimalTypeNarrowing;
 
+    private StringSplitMode stringSplitMode;
+
+    private String stringSplitModeCollate;
+
     public static JdbcSourceConfig of(ReadonlyConfig config) {
         JdbcSourceConfig.Builder builder = JdbcSourceConfig.builder();
         builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config));
@@ -55,7 +60,8 @@ public class JdbcSourceConfig implements Serializable {
                 config.getOptional(JdbcOptions.QUERY).isPresent()
                         && 
config.getOptional(JdbcOptions.PARTITION_COLUMN).isPresent();
         builder.useDynamicSplitter(!isOldVersion);
-
+        builder.stringSplitMode(config.get(JdbcOptions.STRING_SPLIT_MODE));
+        
builder.stringSplitModeCollate(config.get(JdbcOptions.STRING_SPLIT_MODE_COLLATE));
         builder.splitSize(config.get(JdbcSourceOptions.SPLIT_SIZE));
         builder.splitEvenDistributionFactorUpperBound(
                 
config.get(JdbcSourceOptions.SPLIT_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND));
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
index a3522e9c14..17c8d1790c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java
@@ -27,7 +27,6 @@ import lombok.Data;
 import lombok.experimental.Tolerate;
 
 import java.io.Serializable;
-import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -53,10 +52,10 @@ public class JdbcSourceTableConfig implements Serializable {
     private Integer partitionNumber;
 
     @JsonProperty("partition_lower_bound")
-    private BigDecimal partitionStart;
+    private String partitionStart;
 
     @JsonProperty("partition_upper_bound")
-    private BigDecimal partitionEnd;
+    private String partitionEnd;
 
     @JsonProperty("use_select_count")
     private Boolean useSelectCount;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 07fe42c07b..6ec44d92f8 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -814,4 +814,44 @@ public interface JdbcDialect extends Serializable {
     default String quotesDefaultValue(Object defaultValue) {
         return "'" + defaultValue + "'";
     }
+
+    default String getCollationSequence(Connection connection, String collate) 
{
+        StringBuilder sb = new StringBuilder();
+        String getDual = dualTable();
+        String baseQuery = "SELECT char_val FROM (";
+        StringBuilder unionQuery = new StringBuilder();
+        for (int i = 32; i <= 126; i++) {
+            if (i > 32) unionQuery.append(" UNION ALL ");
+            unionQuery.append("SELECT ? AS char_val ").append(getDual);
+        }
+        String sortedQuery =
+                baseQuery + unionQuery + ")  ndi_tmp_chars ORDER BY " + 
getCollateSql(collate);
+        log.info("sortedCollationQuery is " + sortedQuery);
+        PreparedStatement preparedStatement;
+        try {
+            preparedStatement = connection.prepareStatement(sortedQuery);
+            for (int i = 32; i <= 126; i++) {
+                log.debug("setString " + (i - 32) + " => " + (char) i);
+                preparedStatement.setString(i - 32 + 1, String.valueOf((char) 
i));
+            }
+
+            ResultSet resultSet = preparedStatement.executeQuery();
+            while (resultSet.next()) {
+                sb.append(resultSet.getString("char_val"));
+            }
+            return sb.toString();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    default String getCollateSql(String collate) {
+        String getCollate =
+                StringUtils.isNotBlank(collate) ? "char_val COLLATE " + 
collate : "char_val";
+        return getCollate;
+    }
+
+    default String dualTable() {
+        return "";
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
index 5af57bf104..48f1f29875 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java
@@ -98,4 +98,9 @@ public class DB2Dialect implements JdbcDialect {
 
         return Optional.of(mergeStatement);
     }
+
+    @Override
+    public String dualTable() {
+        return " FROM SYSIBM.SYSDUMMY1 ";
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index 683de83e38..cc49ce6327 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -482,4 +482,24 @@ public class OracleDialect implements JdbcDialect {
             return rs.getString("NULLABLE").equals("Y");
         }
     }
+
+    @Override
+    public String dualTable() {
+        return " FROM dual ";
+    }
+
+    @Override
+    public String getCollateSql(String collate) {
+        if (StringUtils.isNotBlank(collate)) {
+            StringBuilder sql = new StringBuilder();
+            sql.append("NLSSORT(")
+                    .append("char_val")
+                    .append(", 'NLS_SORT=")
+                    .append(collate)
+                    .append("')");
+            return sql.toString();
+        } else {
+            return "char_val";
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
index ccb889bf14..ac970d1564 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java
@@ -22,6 +22,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -107,4 +109,22 @@ public class VerticaDialect implements JdbcDialect {
 
         return Optional.of(upsertSQL);
     }
+
+    /**
+     * <a
+     * 
href="https://docs.vertica.com/23.4.x/en/sql-reference/functions/data-type-specific-functions/string-functions/collation/";>vertica-collation</a>
+     *
+     * @param collate
+     * @return
+     */
+    @Override
+    public String getCollateSql(String collate) {
+        if (StringUtils.isNotBlank(collate)) {
+            StringBuilder sql = new StringBuilder();
+            sql.append("COLLATION(").append("char_val").append(", 
'").append(collate).append("')");
+            return sql.toString();
+        } else {
+            return "char_val";
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index f4da0a8d94..c41f275f6a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -85,6 +85,16 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
         }
     }
 
+    protected static String filterOutUppercase(String str) {
+        StringBuilder sb = new StringBuilder();
+        for (char c : str.toCharArray()) {
+            if (!Character.isUpperCase(c)) {
+                sb.append(c);
+            }
+        }
+        return sb.toString();
+    }
+
     public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) 
throws Exception {
         log.info("Start splitting table {} into chunks...", 
table.getTablePath());
         long start = System.currentTimeMillis();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CollationBasedSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CollationBasedSplitter.java
new file mode 100644
index 0000000000..eb98a28a4c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CollationBasedSplitter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+
+@Slf4j
+public class CollationBasedSplitter {
+
+    public static BigInteger encodeStringToNumericRange(
+            String str,
+            int maxLength,
+            boolean paddingAtEnd,
+            boolean isCaseInsensitive,
+            String orderedCharset,
+            int radix) {
+        log.info(
+                "Converting string '{}' to BigInteger, maxLength={}, 
isCaseInsensitive={}",
+                str,
+                maxLength,
+                isCaseInsensitive);
+        String asciiString =
+                stringToAsciiString(
+                        str, maxLength, paddingAtEnd, isCaseInsensitive, 
orderedCharset);
+        log.info("String converted to ASCII representation: {}", asciiString);
+        int[] baseArray = parseBaseNumber(asciiString);
+        log.info("ASCII representation parsed to base array: {}", 
Arrays.toString(baseArray));
+        BigInteger result = toDecimal(baseArray, radix);
+        log.info("Final BigInteger result: {}", result);
+        return result;
+    }
+
+    public static String decodeNumericRangeToString(
+            String bigInteger, int maxLength, int radix, String 
orderedCharset) {
+        log.info(
+                "Converting BigInteger '{}' to string, maxLength={}, radix={}",
+                bigInteger,
+                maxLength,
+                radix);
+        int[] baseArray = fromDecimal(new BigInteger(bigInteger), maxLength, 
radix);
+        log.info("BigInteger converted to base array: {}", 
Arrays.toString(baseArray));
+        String formattedNumber = formatBaseNumber(baseArray);
+        log.info("Base array formatted as number string: {}", formattedNumber);
+        String result = convertToAsciiString(formattedNumber, orderedCharset);
+        log.info("Final string result: '{}'", result);
+        return result;
+    }
+
+    private static int[] parseBaseNumber(String numberStr) {
+        log.trace("Parsing base number from string: {}", numberStr);
+        String[] parts = numberStr.split(" ");
+        int[] result = new int[parts.length];
+        for (int i = 0; i < parts.length; i++) {
+            result[i] = Integer.parseInt(parts[i]);
+        }
+        log.trace("Parsed base number result: {}", Arrays.toString(result));
+        return result;
+    }
+
+    private static String formatBaseNumber(int[] number) {
+        log.trace("Formatting base number array: {}", Arrays.toString(number));
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < number.length; i++) {
+            if (i > 0) sb.append(" ");
+            sb.append(String.format("%03d", number[i]));
+        }
+        String result = sb.toString();
+        log.trace("Formatted base number: {}", result);
+        return result;
+    }
+
+    private static int charToIndex(char c, String supportedChars) {
+        int result = (c == '\u0000') ? 0 : supportedChars.indexOf(c) + 1;
+        log.trace("Char '{}' converted to index: {}", c, result);
+        return result;
+    }
+
+    private static char indexToChar(int index, String supportedChars) {
+        char result = (index == 0) ? '\u0001' : supportedChars.charAt(index - 
1);
+        log.trace("Index {} converted to char: '{}'", index, result);
+        return result;
+    }
+
+    private static BigInteger toDecimal(int[] array, int radix) {
+        log.trace(
+                "Converting array {} to decimal with charset size {}",
+                Arrays.toString(array),
+                radix);
+        BigInteger result = BigInteger.ZERO;
+        for (int i = 0; i < array.length; i++) {
+            BigInteger value = BigInteger.valueOf(array[i]);
+            BigInteger multiplier = BigInteger.valueOf(radix).pow(array.length 
- 1 - i);
+            result = result.add(value.multiply(multiplier));
+        }
+        log.trace("Decimal conversion result: {}", result);
+        return result;
+    }
+
+    private static int[] fromDecimal(BigInteger decimal, int length, int base) 
{
+        log.trace("Converting decimal {} to base {} array of length {}", 
decimal, base, length);
+        int[] result = new int[length];
+        BigInteger remainder = decimal;
+        for (int i = length - 1; i >= 0; i--) {
+            BigInteger divisor = BigInteger.valueOf(base).pow(i);
+            int value = remainder.divide(divisor).intValue();
+            remainder = remainder.mod(divisor);
+            result[length - 1 - i] = value;
+        }
+        log.trace("Base conversion result: {}", Arrays.toString(result));
+        return result;
+    }
+
+    private static String stringToAsciiString(
+            String s,
+            int expectedLength,
+            boolean paddingAtEnd,
+            boolean isCaseInsensitive,
+            String supportedChars) {
+        log.trace(
+                "Converting string '{}' to ASCII string, expectedLength={}, 
paddingAtEnd={}, isCaseInsensitive={}",
+                s,
+                expectedLength,
+                paddingAtEnd,
+                isCaseInsensitive);
+        String str = isCaseInsensitive ? s.toLowerCase() : s;
+        char[] paddedChars = new char[expectedLength];
+
+        if (paddingAtEnd) {
+            for (int i = 0; i < expectedLength; i++) {
+                if (i < str.length()) {
+                    paddedChars[i] = str.charAt(i);
+                } else {
+                    paddedChars[i] = '\u0000';
+                }
+            }
+            log.trace("Applied suffix padding to string");
+        } else {
+            int offset = expectedLength - str.length();
+            for (int i = 0; i < expectedLength; i++) {
+                if (i < offset) {
+                    paddedChars[i] = '\u0000';
+                } else {
+                    paddedChars[i] = str.charAt(i - offset);
+                }
+            }
+            log.trace("Applied prefix padding to string");
+        }
+
+        StringBuilder result = new StringBuilder();
+        for (int i = 0; i < paddedChars.length; i++) {
+            if (i > 0) result.append(" ");
+            result.append(String.format("%03d", charToIndex(paddedChars[i], 
supportedChars)));
+        }
+        String asciiResult = result.toString();
+        log.trace("ASCII string conversion result: {}", asciiResult);
+        return asciiResult;
+    }
+
+    private static String convertToAsciiString(String input, String 
supportedChars) {
+        log.trace("Converting ASCII representation '{}' back to string", 
input);
+        String[] asciiValues = input.split(" ");
+        StringBuilder result = new StringBuilder();
+
+        for (String value : asciiValues) {
+            char c = indexToChar(Integer.parseInt(value), supportedChars);
+            result.append(c);
+        }
+
+        String resultString = result.toString();
+        if (resultString.replaceAll("\u0001", "").isEmpty()) {
+            log.trace("Detected all placeholder characters, returning empty 
string");
+            return "";
+        } else {
+            log.trace("ASCII to string conversion result: '{}'", resultString);
+            return resultString;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index d958c405df..e2bac89780 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -56,6 +57,9 @@ import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
 @Slf4j
 public class DynamicChunkSplitter extends ChunkSplitter {
 
+    private final boolean useCharsetBasedStringSplitter =
+            StringSplitMode.CHARSET_BASED.equals(config.getStringSplitMode());
+
     public DynamicChunkSplitter(JdbcSourceConfig config) {
         super(config);
     }
@@ -124,8 +128,14 @@ public class DynamicChunkSplitter extends ChunkSplitter {
             case DECIMAL:
             case DOUBLE:
             case FLOAT:
-            case STRING:
                 return evenlyColumnSplitChunks(table, splitColumnName, min, 
max, chunkSize);
+            case STRING:
+                if (useCharsetBasedStringSplitter) {
+                    return charsetBasedColumnSplitChunks(
+                            table, splitColumnName, min, max, chunkSize);
+                } else {
+                    return evenlyColumnSplitChunks(table, splitColumnName, 
min, max, chunkSize);
+                }
             case DATE:
                 return dateColumnSplitChunks(table, splitColumnName, min, max, 
chunkSize);
             default:
@@ -134,6 +144,94 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         }
     }
 
+    private List<ChunkRange> charsetBasedColumnSplitChunks(
+            JdbcSourceTable table,
+            String splitColumnName,
+            Object objectMin,
+            Object objectMax,
+            int chunkSize)
+            throws Exception {
+        boolean paddingAtEnd = true;
+        boolean isCaseInsensitive = false;
+        String collationSequence =
+                jdbcDialect.getCollationSequence(
+                        getOrEstablishConnection(), 
config.getStringSplitModeCollate());
+        if (collationSequence.matches(".*[aA][Aa].*")) {
+            isCaseInsensitive = true;
+            collationSequence = filterOutUppercase(collationSequence);
+        }
+        int radix = collationSequence.length() + 1;
+        String minStr = objectMin.toString();
+        String maxStr = objectMax.toString();
+        int maxLength = Math.max(minStr.length(), maxStr.length());
+        BigInteger min =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        minStr,
+                        maxLength,
+                        paddingAtEnd,
+                        isCaseInsensitive,
+                        collationSequence,
+                        radix);
+        BigInteger max =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        maxStr,
+                        maxLength,
+                        paddingAtEnd,
+                        isCaseInsensitive,
+                        collationSequence,
+                        radix);
+        TablePath tablePath = table.getTablePath();
+        double distributionFactorUpper = 
config.getSplitEvenDistributionFactorUpperBound();
+        double distributionFactorLower = 
config.getSplitEvenDistributionFactorLowerBound();
+        int sampleShardingThreshold = config.getSplitSampleShardingThreshold();
+        log.info(
+                "Splitting table {} into chunks, split column: {}, min: {}, 
max: {}, chunk size: {}, "
+                        + "distribution factor upper: {}, distribution factor 
lower: {}, sample sharding threshold: {}",
+                tablePath,
+                splitColumnName,
+                min,
+                max,
+                chunkSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                sampleShardingThreshold);
+
+        long approximateRowCnt = queryApproximateRowCnt(table);
+
+        double distributionFactor =
+                calculateDistributionFactor(tablePath, min, max, 
approximateRowCnt);
+
+        boolean dataIsEvenlyDistributed =
+                ObjectUtils.doubleCompare(distributionFactor, 
distributionFactorLower) >= 0
+                        && ObjectUtils.doubleCompare(distributionFactor, 
distributionFactorUpper)
+                                <= 0;
+
+        if (dataIsEvenlyDistributed) {
+            // the minimum dynamic chunk size is at least 1
+            final int dynamicChunkSize = Math.max((int) (distributionFactor * 
chunkSize), 1);
+            return splitStringEvenlySizedChunks(
+                    tablePath,
+                    min,
+                    max,
+                    approximateRowCnt,
+                    chunkSize,
+                    dynamicChunkSize,
+                    maxLength,
+                    radix,
+                    collationSequence);
+        } else {
+            return getChunkRangesWithUnevenlyData(
+                    table,
+                    splitColumnName,
+                    min,
+                    max,
+                    chunkSize,
+                    tablePath,
+                    sampleShardingThreshold,
+                    approximateRowCnt);
+        }
+    }
+
     private List<ChunkRange> evenlyColumnSplitChunks(
             JdbcSourceTable table, String splitColumnName, Object min, Object 
max, int chunkSize)
             throws Exception {
@@ -169,42 +267,63 @@ public class DynamicChunkSplitter extends ChunkSplitter {
             return splitEvenlySizedChunks(
                     tablePath, min, max, approximateRowCnt, chunkSize, 
dynamicChunkSize);
         } else {
-            int shardCount = (int) (approximateRowCnt / chunkSize);
-            int inverseSamplingRate = config.getSplitInverseSamplingRate();
-            if (sampleShardingThreshold < shardCount) {
-                // It is necessary to ensure that the number of data rows 
sampled by the
-                // sampling rate is greater than the number of shards.
-                // Otherwise, if the sampling rate is too low, it may result 
in an insufficient
-                // number of data rows for the shards, leading to an 
inadequate number of
-                // shards.
-                // Therefore, inverseSamplingRate should be less than chunkSize
-                if (inverseSamplingRate > chunkSize) {
-                    log.warn(
-                            "The inverseSamplingRate is {}, which is greater 
than chunkSize {}, so we set inverseSamplingRate to chunkSize",
-                            inverseSamplingRate,
-                            chunkSize);
-                    inverseSamplingRate = chunkSize;
-                }
-                log.info(
-                        "Use sampling sharding for table {}, the sampling rate 
is {}",
-                        tablePath,
-                        inverseSamplingRate);
-                Object[] sample =
-                        jdbcDialect.sampleDataFromColumn(
-                                getOrEstablishConnection(),
-                                table,
-                                splitColumnName,
-                                inverseSamplingRate,
-                                config.getFetchSize());
-                log.info(
-                        "Sample data from table {} end, the sample size is {}",
-                        tablePath,
-                        sample.length);
-                return efficientShardingThroughSampling(
-                        tablePath, sample, approximateRowCnt, shardCount);
+            return getChunkRangesWithUnevenlyData(
+                    table,
+                    splitColumnName,
+                    min,
+                    max,
+                    chunkSize,
+                    tablePath,
+                    sampleShardingThreshold,
+                    approximateRowCnt);
+        }
+    }
+
+    private List<ChunkRange> getChunkRangesWithUnevenlyData(
+            JdbcSourceTable table,
+            String splitColumnName,
+            Object min,
+            Object max,
+            int chunkSize,
+            TablePath tablePath,
+            int sampleShardingThreshold,
+            long approximateRowCnt)
+            throws Exception {
+        int shardCount = (int) (approximateRowCnt / chunkSize);
+        int inverseSamplingRate = config.getSplitInverseSamplingRate();
+        if (sampleShardingThreshold < shardCount) {
+            // It is necessary to ensure that the number of data rows sampled 
by the
+            // sampling rate is greater than the number of shards.
+            // Otherwise, if the sampling rate is too low, it may result in an 
insufficient
+            // number of data rows for the shards, leading to an inadequate 
number of
+            // shards.
+            // Therefore, inverseSamplingRate should be less than chunkSize
+            if (inverseSamplingRate > chunkSize) {
+                log.warn(
+                        "The inverseSamplingRate is {}, which is greater than 
chunkSize {}, so we set inverseSamplingRate to chunkSize",
+                        inverseSamplingRate,
+                        chunkSize);
+                inverseSamplingRate = chunkSize;
             }
-            return splitUnevenlySizedChunks(table, splitColumnName, min, max, 
chunkSize);
+            log.info(
+                    "Use sampling sharding for table {}, the sampling rate is 
{}",
+                    tablePath,
+                    inverseSamplingRate);
+            Object[] sample =
+                    jdbcDialect.sampleDataFromColumn(
+                            getOrEstablishConnection(),
+                            table,
+                            splitColumnName,
+                            inverseSamplingRate,
+                            config.getFetchSize());
+            log.info(
+                    "Sample data from table {} end, the sample size is {}",
+                    tablePath,
+                    sample.length);
+            return efficientShardingThroughSampling(
+                    tablePath, sample, approximateRowCnt, shardCount);
         }
+        return splitUnevenlySizedChunks(table, splitColumnName, min, max, 
chunkSize);
     }
 
     private Long queryApproximateRowCnt(JdbcSourceTable table) throws 
SQLException {
@@ -238,6 +357,68 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         return distributionFactor;
     }
 
+    private List<ChunkRange> splitStringEvenlySizedChunks(
+            TablePath tablePath,
+            Object min,
+            Object max,
+            long approximateRowCnt,
+            int chunkSize,
+            int dynamicChunkSize,
+            int maxLength,
+            int radix,
+            String collationSequence) {
+        log.info(
+                "Use evenly-sized chunk optimization for table {}, the 
approximate row count is {}, the chunk size is {}, the dynamic chunk size is 
{}",
+                tablePath,
+                approximateRowCnt,
+                chunkSize,
+                dynamicChunkSize);
+        if (approximateRowCnt <= chunkSize) {
+            // there is no more than one chunk, return full table as a chunk
+            return Collections.singletonList(ChunkRange.all());
+        }
+
+        final List<ChunkRange> splits = new ArrayList<>();
+        Object chunkStart = null;
+        Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
+        while (ObjectUtils.compare(chunkEnd, max) <= 0) {
+            splits.add(
+                    ChunkRange.of(
+                            chunkStart == null
+                                    ? null
+                                    : 
CollationBasedSplitter.decodeNumericRangeToString(
+                                            chunkStart.toString(),
+                                            maxLength,
+                                            radix,
+                                            collationSequence),
+                            chunkEnd == null
+                                    ? null
+                                    : 
CollationBasedSplitter.decodeNumericRangeToString(
+                                            chunkEnd.toString(),
+                                            maxLength,
+                                            radix,
+                                            collationSequence)));
+            chunkStart = chunkEnd;
+            try {
+                chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize);
+            } catch (ArithmeticException e) {
+                // Stop chunk split to avoid dead loop when number overflows.
+                break;
+            }
+        }
+        // add the ending split
+        if (chunkStart != null) {
+            splits.add(
+                    ChunkRange.of(
+                            CollationBasedSplitter.decodeNumericRangeToString(
+                                    chunkStart.toString(), maxLength, radix, 
collationSequence),
+                            null));
+        } else {
+            splits.add(ChunkRange.of(null, null));
+        }
+        return splits;
+    }
+
     private List<ChunkRange> splitEvenlySizedChunks(
             TablePath tablePath,
             Object min,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
index 72a4e061ac..1509bdfc2b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -50,6 +51,9 @@ import java.util.List;
 @Slf4j
 public class FixedChunkSplitter extends ChunkSplitter {
 
+    private final boolean useCharsetBasedStringSplitter =
+            StringSplitMode.CHARSET_BASED.equals(config.getStringSplitMode());
+
     public FixedChunkSplitter(JdbcSourceConfig config) {
         super(config);
     }
@@ -71,11 +75,107 @@ public class FixedChunkSplitter extends ChunkSplitter {
             }
         }
         if (SqlType.STRING.equals(splitKeyType.getSqlType())) {
-            return createStringColumnSplits(table, splitKeyName, splitKeyType);
+            log.info("useNewStringSplitter is {}", 
useCharsetBasedStringSplitter);
+            if (useCharsetBasedStringSplitter) {
+                return getJdbcSourceStringSplits(table, splitKeyName, 
splitKeyType);
+            } else {
+                return createStringColumnSplits(table, splitKeyName, 
splitKeyType);
+            }
         }
+        return getJdbcSourceSplits(table, splitKeyName, splitKeyType);
+    }
 
-        BigDecimal partitionStart = table.getPartitionStart();
-        BigDecimal partitionEnd = table.getPartitionEnd();
+    private Collection<JdbcSourceSplit> getJdbcSourceStringSplits(
+            JdbcSourceTable table, String splitKeyName, SeaTunnelDataType 
splitKeyType)
+            throws SQLException {
+        String partitionStart = table.getPartitionStart();
+        String partitionEnd = table.getPartitionEnd();
+        if (partitionStart == null || partitionEnd == null) {
+            Pair<String, String> range = findSplitStringColumnRange(table, 
splitKeyName);
+            partitionStart = range.getLeft();
+            partitionEnd = range.getRight();
+        }
+        if (partitionStart == null || partitionEnd == null) {
+            JdbcSourceSplit split = createSingleSplit(table);
+            return Collections.singletonList(split);
+        }
+        boolean paddingAtEnd = true;
+        boolean isCaseInsensitive = false;
+        String collationSequence =
+                jdbcDialect.getCollationSequence(
+                        getOrEstablishConnection(), 
config.getStringSplitModeCollate());
+        if (collationSequence.matches(".*[aA][Aa].*")) {
+            isCaseInsensitive = true;
+            collationSequence = filterOutUppercase(collationSequence);
+        }
+        int radix = collationSequence.length() + 1;
+        int maxLength = Math.max(partitionStart.length(), 
partitionEnd.length());
+        BigInteger min =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        partitionStart,
+                        maxLength,
+                        paddingAtEnd,
+                        isCaseInsensitive,
+                        collationSequence,
+                        radix);
+        BigInteger max =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        partitionEnd,
+                        maxLength,
+                        paddingAtEnd,
+                        isCaseInsensitive,
+                        collationSequence,
+                        radix);
+        Collection<JdbcSourceSplit> numberColumnSplits =
+                createNumberColumnSplits(
+                        table,
+                        splitKeyName,
+                        splitKeyType,
+                        new BigDecimal(min),
+                        new BigDecimal(max));
+        if (CollectionUtils.isNotEmpty(numberColumnSplits)) {
+            List<JdbcSourceSplit> result = new ArrayList<>();
+            int index = 0;
+            for (JdbcSourceSplit jdbcSourceSplit : numberColumnSplits) {
+                result.add(
+                        new JdbcSourceSplit(
+                                jdbcSourceSplit.getTablePath(),
+                                jdbcSourceSplit.getSplitId(),
+                                jdbcSourceSplit.getSplitQuery(),
+                                jdbcSourceSplit.getSplitKeyName(),
+                                jdbcSourceSplit.getSplitKeyType(),
+                                index == 0
+                                        ? partitionStart
+                                        : 
CollationBasedSplitter.decodeNumericRangeToString(
+                                                
jdbcSourceSplit.getSplitStart().toString(),
+                                                maxLength,
+                                                radix,
+                                                collationSequence),
+                                index == numberColumnSplits.size() - 1
+                                        ? partitionEnd
+                                        : 
CollationBasedSplitter.decodeNumericRangeToString(
+                                                
jdbcSourceSplit.getSplitEnd().toString(),
+                                                maxLength,
+                                                radix,
+                                                collationSequence)));
+                index++;
+            }
+            return result;
+        }
+        return numberColumnSplits;
+    }
+
+    private Collection<JdbcSourceSplit> getJdbcSourceSplits(
+            JdbcSourceTable table, String splitKeyName, SeaTunnelDataType 
splitKeyType)
+            throws SQLException {
+        BigDecimal partitionStart =
+                StringUtils.isBlank(table.getPartitionStart())
+                        ? null
+                        : new BigDecimal(table.getPartitionStart());
+        BigDecimal partitionEnd =
+                StringUtils.isBlank(table.getPartitionEnd())
+                        ? null
+                        : new BigDecimal(table.getPartitionEnd());
         if (partitionStart == null || partitionEnd == null) {
             Pair<BigDecimal, BigDecimal> range = findSplitColumnRange(table, 
splitKeyName);
             partitionStart = range.getLeft();
@@ -93,7 +193,8 @@ public class FixedChunkSplitter extends ChunkSplitter {
     @Override
     protected PreparedStatement createSplitStatement(JdbcSourceSplit split, 
TableSchema schema)
             throws SQLException {
-        if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType())) {
+        if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType())
+                && !useCharsetBasedStringSplitter) {
             return createStringColumnSplitStatement(split);
         }
         if (split.getSplitStart() == null && split.getSplitEnd() == null) {
@@ -244,6 +345,20 @@ public class FixedChunkSplitter extends ChunkSplitter {
         return statement;
     }
 
+    private Pair<String, String> findSplitStringColumnRange(
+            JdbcSourceTable table, String columnName) throws SQLException {
+        Pair<Object, Object> splitColumnRange = queryMinMax(table, columnName);
+        Object min = splitColumnRange.getLeft();
+        Object max = splitColumnRange.getRight();
+        if (min != null) {
+            min = min.toString();
+        }
+        if (max != null) {
+            max = max.toString();
+        }
+        return Pair.of(((String) min), ((String) max));
+    }
+
     private Pair<BigDecimal, BigDecimal> findSplitColumnRange(
             JdbcSourceTable table, String columnName) throws SQLException {
         Pair<Object, Object> splitColumnRange = queryMinMax(table, columnName);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java
index 8aad94c8b6..7144c9ed2d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java
@@ -24,7 +24,6 @@ import lombok.Builder;
 import lombok.Data;
 
 import java.io.Serializable;
-import java.math.BigDecimal;
 
 @Data
 @Builder
@@ -35,8 +34,8 @@ public class JdbcSourceTable implements Serializable {
     private final String query;
     private final String partitionColumn;
     private final Integer partitionNumber;
-    private final BigDecimal partitionStart;
-    private final BigDecimal partitionEnd;
+    private final String partitionStart;
+    private final String partitionEnd;
     private final Boolean useSelectCount;
     private final Boolean skipAnalyze;
     private final CatalogTable catalogTable;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/StringSplitMode.java
similarity index 55%
copy from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java
copy to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/StringSplitMode.java
index 8aad94c8b6..e020394085 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/StringSplitMode.java
@@ -17,27 +17,27 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.TablePath;
-
-import lombok.Builder;
-import lombok.Data;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-
-@Data
-@Builder
-public class JdbcSourceTable implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private final TablePath tablePath;
-    private final String query;
-    private final String partitionColumn;
-    private final Integer partitionNumber;
-    private final BigDecimal partitionStart;
-    private final BigDecimal partitionEnd;
-    private final Boolean useSelectCount;
-    private final Boolean skipAnalyze;
-    private final CatalogTable catalogTable;
+public enum StringSplitMode {
+    SAMPLE("sample"),
+
+    CHARSET_BASED("charset_based");
+
+    public boolean equals(String mode) {
+        return this.mode.equalsIgnoreCase(mode);
+    }
+
+    private final String mode;
+
+    StringSplitMode(String mode) {
+        this.mode = mode;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    @Override
+    public String toString() {
+        return mode;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CharsetBasedSplitterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CharsetBasedSplitterTest.java
new file mode 100644
index 0000000000..c877e30827
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CharsetBasedSplitterTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.math.BigInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Slf4j
+public class CharsetBasedSplitterTest {
+
+    private static final String DEFAULT_CHARSET = 
"0123456789abcdefghijklmnopqrstuvwxyz";
+
+    @Test
+    @DisplayName("Test encoding of minimum and maximum values")
+    public void testMinMax() {
+        String minStr = "00000";
+        String maxStr = "1";
+        int maxLen = Math.max(minStr.length(), maxStr.length());
+        String orderedCharset = "012a34b56789";
+        BigInteger minBigInt =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        minStr, maxLen, true, true, orderedCharset, 
orderedCharset.length() + 1);
+        log.info("Minimum value encoding: " + minBigInt);
+
+        BigInteger maxBigInt =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        maxStr, maxLen, true, true, orderedCharset, 
orderedCharset.length() + 1);
+        log.info("Maximum value encoding: " + maxBigInt);
+
+        assert maxBigInt.compareTo(minBigInt) > 0;
+    }
+
+    @Test
+    @DisplayName("Test consistency of string encoding and decoding")
+    public void testEncodeDecode() {
+        String original = "abc123";
+        int maxLength = 10;
+        boolean paddingAtEnd = true;
+        boolean isCaseInsensitive = true;
+        int radix = DEFAULT_CHARSET.length() + 1;
+
+        BigInteger encoded =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        original,
+                        maxLength,
+                        paddingAtEnd,
+                        isCaseInsensitive,
+                        DEFAULT_CHARSET,
+                        radix);
+
+        String decoded =
+                CollationBasedSplitter.decodeNumericRangeToString(
+                        encoded.toString(), maxLength, radix, DEFAULT_CHARSET);
+
+        assertEquals(original.toLowerCase(), decoded.trim());
+    }
+
+    @Test
+    @DisplayName("Test charset with special characters")
+    public void testSpecialCharset() {
+        String customCharset = "!@#$%^&*()_+-=[]{}|;:,.<>?";
+        String input = "!@#$%";
+        int maxLength = 10;
+        int radix = customCharset.length() + 1;
+
+        BigInteger encoded =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        input, maxLength, true, false, customCharset, radix);
+
+        String decoded =
+                CollationBasedSplitter.decodeNumericRangeToString(
+                        encoded.toString(), maxLength, radix, customCharset);
+
+        assertEquals(input, decoded.trim());
+    }
+
+    @Test
+    @DisplayName("Test impact of different padding positions")
+    public void testPaddingPosition() {
+        String input = "xyz";
+        int maxLength = 5;
+        int radix = DEFAULT_CHARSET.length() + 1;
+
+        BigInteger encodedPrefix =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        input, maxLength, false, false, DEFAULT_CHARSET, 
radix);
+        String decodedPrefix =
+                CollationBasedSplitter.decodeNumericRangeToString(
+                        encodedPrefix.toString(), maxLength, radix, 
DEFAULT_CHARSET);
+
+        BigInteger encodedSuffix =
+                CollationBasedSplitter.encodeStringToNumericRange(
+                        input, maxLength, true, false, DEFAULT_CHARSET, radix);
+        String decodedSuffix =
+                CollationBasedSplitter.decodeNumericRangeToString(
+                        encodedSuffix.toString(), maxLength, radix, 
DEFAULT_CHARSET);
+
+        assertEquals(input, decodedPrefix.trim());
+        assertEquals(input, decodedSuffix.trim());
+
+        assert !encodedPrefix.equals(encodedSuffix);
+    }
+
+    @Test
+    @DisplayName("Test performance")
+    public void testPerformance() {
+        int iterations = 1000;
+        String input = "abcdefghijklmnopqrstuvwxyz";
+        int maxLength = 30;
+        int radix = DEFAULT_CHARSET.length() + 1;
+
+        long startTime = System.currentTimeMillis();
+
+        for (int i = 0; i < iterations; i++) {
+            BigInteger encoded =
+                    CollationBasedSplitter.encodeStringToNumericRange(
+                            input, maxLength, true, true, DEFAULT_CHARSET, 
radix);
+
+            String decoded =
+                    CollationBasedSplitter.decodeNumericRangeToString(
+                            encoded.toString(), maxLength, radix, 
DEFAULT_CHARSET);
+
+            assertEquals(input, decoded.trim());
+        }
+
+        long endTime = System.currentTimeMillis();
+        long duration = endTime - startTime;
+
+        log.info(
+                "Executing "
+                        + iterations
+                        + " encoding/decoding operations took: "
+                        + duration
+                        + " milliseconds");
+        log.info("Average time per operation: " + (double) duration / 
iterations + " milliseconds");
+    }
+
+    @Test
+    @DisplayName("Test encoding and decoding of random strings")
+    public void testRandomStrings() {
+        java.util.Random random = new java.util.Random();
+        int testCount = 10;
+        int maxLength = 20;
+        int radix = DEFAULT_CHARSET.length() + 1;
+        for (int test = 0; test < testCount; test++) {
+            int length = random.nextInt(maxLength) + 1;
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < length; i++) {
+                int charIndex = random.nextInt(DEFAULT_CHARSET.length());
+                sb.append(DEFAULT_CHARSET.charAt(charIndex));
+            }
+            String randomString = sb.toString();
+            BigInteger encoded =
+                    CollationBasedSplitter.encodeStringToNumericRange(
+                            randomString, maxLength, true, false, 
DEFAULT_CHARSET, radix);
+
+            String decoded =
+                    CollationBasedSplitter.decodeNumericRangeToString(
+                            encoded.toString(), maxLength, radix, 
DEFAULT_CHARSET);
+
+            log.info("Random string #" + test + ": " + randomString);
+            log.info("Encoding result: " + encoded);
+            log.info("Decoding result: " + decoded.trim());
+
+            assertEquals(randomString, decoded.trim());
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
index 428ace282b..11354ad3b5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java
@@ -85,6 +85,7 @@ public class JdbcGreenplumIT extends AbstractJdbcIT {
                 .insertSql(insertSql)
                 .testData(testDataSet)
                 .tablePathFullName(GREENPLUM_SOURCE)
+                .useSaveModeCreateTable(false)
                 .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_greenplum_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_greenplum_source_and_sink.conf
index 2f5314e67a..bfb5304157 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_greenplum_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_greenplum_source_and_sink.conf
@@ -31,6 +31,8 @@ source {
     user = tester
     password = pivotal
     query = "select age, name from source"
+    partition_column = "name"
+    split.string_split_mode = charset_based
   }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
index 6dcc6e51ba..21567ac818 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.common.utils.JdbcUrlUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.DynamicChunkSplitter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.source.FixedChunkSplitter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
 import org.apache.seatunnel.e2e.common.TestResource;
@@ -538,6 +539,13 @@ public class JdbcMysqlSplitIT extends TestSuiteBase 
implements TestResource {
         return splitter;
     }
 
+    @NotNull private FixedChunkSplitter getFixedChunkSplitter(Map<String, 
Object> configMap) {
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap);
+        JdbcSourceConfig sourceConfig = JdbcSourceConfig.of(readonlyConfig);
+        FixedChunkSplitter splitter = new FixedChunkSplitter(sourceConfig);
+        return splitter;
+    }
+
     @Override
     public void tearDown() throws Exception {
         if (mysql_container != null) {
@@ -545,4 +553,155 @@ public class JdbcMysqlSplitIT extends TestSuiteBase 
implements TestResource {
             
dockerClient.removeContainerCmd(mysql_container.getContainerId()).exec();
         }
     }
+
+    @Test
+    public void testDynamicCharSplit() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("url", mysqlUrlInfo.getUrlWithDatabase().get());
+        configMap.put("driver", "com.mysql.cj.jdbc.Driver");
+        configMap.put("user", MYSQL_USERNAME);
+        configMap.put("password", MYSQL_PASSWORD);
+        configMap.put("table_path", MYSQL_DATABASE + "." + MYSQL_TABLE);
+        configMap.put("split.size", "10");
+        configMap.put("split.string_split_mode", "charset_based");
+
+        TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE);
+        MySqlCatalog mySqlCatalog =
+                new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, 
mysqlUrlInfo, null);
+        mySqlCatalog.open();
+        Assertions.assertTrue(mySqlCatalog.tableExists(tablePathMySql));
+        CatalogTable table = mySqlCatalog.getTable(tablePathMySql);
+
+        String[] charColumns = {
+            "c_char", "c_varchar", "c_tinytext", "c_text", "c_mediumtext", 
"c_longtext"
+        };
+
+        for (String charColumn : charColumns) {
+            try {
+                LOG.info("Testing split on character column: {}", charColumn);
+                configMap.put("partition_column", charColumn);
+                DynamicChunkSplitter splitter = 
getDynamicChunkSplitter(configMap);
+
+                JdbcSourceTable jdbcSourceTable =
+                        JdbcSourceTable.builder()
+                                .tablePath(TablePath.of(MYSQL_DATABASE, 
MYSQL_TABLE))
+                                .catalogTable(table)
+                                .partitionColumn(charColumn)
+                                .build();
+
+                Collection<JdbcSourceSplit> jdbcSourceSplits =
+                        splitter.generateSplits(jdbcSourceTable);
+
+                LOG.info(
+                        "Split results for column {}: {} splits",
+                        charColumn,
+                        jdbcSourceSplits.size());
+                int splitIndex = 0;
+                for (JdbcSourceSplit split : jdbcSourceSplits) {
+                    LOG.info(
+                            "Split {}: key={}, start={}, end={}",
+                            splitIndex++,
+                            split.getSplitKeyName(),
+                            split.getSplitStart(),
+                            split.getSplitEnd());
+                }
+
+                if (!jdbcSourceSplits.isEmpty()) {
+                    JdbcSourceSplit[] splitArray = 
jdbcSourceSplits.toArray(new JdbcSourceSplit[0]);
+                    Assertions.assertEquals(charColumn, 
splitArray[0].getSplitKeyName());
+                    printCharSplitBoundaries(splitArray);
+                }
+            } catch (Exception e) {
+                LOG.error("Error splitting on column {}: {}", charColumn, 
e.getMessage(), e);
+            }
+        }
+
+        mySqlCatalog.close();
+    }
+
+    @Test
+    public void testFixedCharSplit() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("url", mysqlUrlInfo.getUrlWithDatabase().get());
+        configMap.put("driver", "com.mysql.cj.jdbc.Driver");
+        configMap.put("user", MYSQL_USERNAME);
+        configMap.put("password", MYSQL_PASSWORD);
+        configMap.put("table_path", MYSQL_DATABASE + "." + MYSQL_TABLE);
+        configMap.put("split.string_split_mode", "charset_based");
+
+        TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE);
+        MySqlCatalog mySqlCatalog =
+                new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, 
mysqlUrlInfo, null);
+        mySqlCatalog.open();
+        Assertions.assertTrue(mySqlCatalog.tableExists(tablePathMySql));
+        CatalogTable table = mySqlCatalog.getTable(tablePathMySql);
+
+        String[] charColumns = {
+            "c_bigint", "c_varchar", "c_tinytext", "c_text", "c_mediumtext", 
"c_longtext", "c_char"
+        };
+
+        for (String charColumn : charColumns) {
+            try {
+                LOG.info("Testing split on character column: {}", charColumn);
+                configMap.put("partition_column", charColumn);
+                FixedChunkSplitter splitter = getFixedChunkSplitter(configMap);
+
+                JdbcSourceTable jdbcSourceTable =
+                        JdbcSourceTable.builder()
+                                .tablePath(TablePath.of(MYSQL_DATABASE, 
MYSQL_TABLE))
+                                .catalogTable(table)
+                                .partitionColumn(charColumn)
+                                .partitionNumber(10)
+                                .build();
+
+                Collection<JdbcSourceSplit> jdbcSourceSplits =
+                        splitter.generateSplits(jdbcSourceTable);
+
+                LOG.info(
+                        "Split results for column {}: {} splits",
+                        charColumn,
+                        jdbcSourceSplits.size());
+                int splitIndex = 0;
+                for (JdbcSourceSplit split : jdbcSourceSplits) {
+                    LOG.info(
+                            "Split {}: key={}, start={}, end={}",
+                            splitIndex++,
+                            split.getSplitKeyName(),
+                            split.getSplitStart(),
+                            split.getSplitEnd());
+                }
+            } catch (Exception e) {
+                LOG.error("Error splitting on column {}: {}", charColumn, 
e.getMessage(), e);
+            }
+        }
+
+        mySqlCatalog.close();
+    }
+
+    private void printCharSplitBoundaries(JdbcSourceSplit[] splitArray) {
+        LOG.info("Character column split boundaries:");
+        for (int i = 0; i < splitArray.length; i++) {
+            Object start = splitArray[i].getSplitStart();
+            Object end = splitArray[i].getSplitEnd();
+
+            LOG.info(
+                    "Split {}: start={}, end={}",
+                    i,
+                    start == null ? "NULL" : "'" + start.toString() + "'",
+                    end == null ? "NULL" : "'" + end.toString() + "'");
+
+            if (i == 0) {
+                Assertions.assertNull(start, "First split should start with 
NULL");
+            }
+
+            if (i == splitArray.length - 1) {
+                Assertions.assertNull(end, "Last split should end with NULL");
+            }
+
+            if (i > 0 && i < splitArray.length - 1) {
+                Assertions.assertNotNull(start, "Middle split should have 
non-null start");
+                Assertions.assertNotNull(end, "Middle split should have 
non-null end");
+            }
+        }
+    }
 }

Reply via email to