This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new dbe41e74cd [Feature][Jdbc] Add String type column split Support by charset-based splitting algorithm (#9002) dbe41e74cd is described below commit dbe41e74cd382e3843a8858ceb1e78baa0a99f31 Author: chenhongyu <2795267...@qq.com> AuthorDate: Wed Mar 26 10:55:44 2025 +0800 [Feature][Jdbc] Add String type column split Support by charset-based splitting algorithm (#9002) Co-authored-by: chenhongyu05 <chenhongy...@corp.netease.com> --- docs/en/connector-v2/source/Jdbc.md | 8 +- .../seatunnel/jdbc/config/JdbcOptions.java | 24 +- .../seatunnel/jdbc/config/JdbcSourceConfig.java | 8 +- .../jdbc/config/JdbcSourceTableConfig.java | 5 +- .../jdbc/internal/dialect/JdbcDialect.java | 40 ++++ .../jdbc/internal/dialect/db2/DB2Dialect.java | 5 + .../internal/dialect/oracle/OracleDialect.java | 20 ++ .../internal/dialect/vertica/VerticaDialect.java | 20 ++ .../seatunnel/jdbc/source/ChunkSplitter.java | 10 + .../jdbc/source/CollationBasedSplitter.java | 196 ++++++++++++++++ .../jdbc/source/DynamicChunkSplitter.java | 251 ++++++++++++++++++--- .../seatunnel/jdbc/source/FixedChunkSplitter.java | 123 +++++++++- .../seatunnel/jdbc/source/JdbcSourceTable.java | 5 +- .../{JdbcSourceTable.java => StringSplitMode.java} | 46 ++-- .../jdbc/source/CharsetBasedSplitterTest.java | 189 ++++++++++++++++ .../connectors/seatunnel/jdbc/JdbcGreenplumIT.java | 1 + .../resources/jdbc_greenplum_source_and_sink.conf | 2 + .../seatunnel/jdbc/JdbcMysqlSplitIT.java | 159 +++++++++++++ 18 files changed, 1035 insertions(+), 77 deletions(-) diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 1505c4895e..bc5e39e0a8 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -53,8 +53,8 @@ supports query SQL and can achieve projection effect. | partition_column | String | No | - | The column name for split data. [...] | partition_upper_bound | Long | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. [...] | partition_lower_bound | Long | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. [...] -| partition_num | Int | No | job parallelism | Not recommended for use, The correct approach is to control the number of split through `split.size`<br/> **Note:** This parameter takes effect only when using the `query` parameter. It does not take effect when using the `table_path` parameter. [...] -| decimal_type_narrowing | Boolean | No | true | Decimal type narrowing, if true, the decimal type will be narrowed to the int or long type if without loss of precision. Only support for Oracle at now. Please refer to `decimal_type_narrowing` below [...] +| partition_num | Int | No | job parallelism | Not recommended for use, The correct approach is to control the number of split through `split.size`<br/> **Note:** This parameter takes effect only when using the `query` parameter. It does not take effect when using the `table_path` parameter. [...] +| decimal_type_narrowing | Boolean | No | true | Decimal type narrowing, if true, the decimal type will be narrowed to the int or long type if without loss of precision. Only support for Oracle at now. Please refer to `decimal_type_narrowing` below [...] | use_select_count | Boolean | No | false | Use select count for table count rather then other methods in dynamic chunk split stage. This is currently only available for jdbc-oracle.In this scenario, select count directly is used when it is faster to update statistics using sql from analysis table [...] | skip_analyze | Boolean | No | false | Skip the analysis of table count in dynamic chunk split stage. This is currently only available for jdbc-oracle.In this scenario, you schedule analysis table sql to update related table statistics periodically or your table data does not change frequently [...] | fetch_size | Int | No | 0 | For queries that return a large number of objects, you can configure the row fetch size used in the query to improve performance by reducing the number database hits required to satisfy the selection criteria. Zero means use jdbc default value. [...] @@ -62,12 +62,14 @@ supports query SQL and can achieve projection effect. | table_path | String | No | - | The path to the full path of table, you can use this configuration instead of `query`. <br/>examples: <br/>`- mysql: "testdb.table1" `<br/>`- oracle: "test_schema.table1" `<br/>`- sqlserver: "testdb.test_schema.table1"` <br/>`- postgresql: "testdb.test_schema.table1"` <br/>`- iris: "test_schema.table1"` [...] | table_list | Array | No | - | The list of tables to be read, you can use this configuration instead of `table_path` [...] | where_condition | String | No | - | Common row filter conditions for all tables/queries, must start with `where`. for example `where id > 100` [...] -| split.size | Int | No | 8096 | How many rows in one split, captured tables are split into multiple splits when read of table. **Note**: This parameter takes effect only when using the `table_path` parameter. It does not take effect when using the `query` parameter. [...] +| split.size | Int | No | 8096 | How many rows in one split, captured tables are split into multiple splits when read of table. **Note**: This parameter takes effect only when using the `table_path` parameter. It does not take effect when using the `query` parameter. [...] | split.even-distribution.factor.lower-bound | Double | No | 0.05 | Not recommended for use.<br/> The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the [...] | split.even-distribution.factor.upper-bound | Double | No | 100 | Not recommended for use.<br/> The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the [...] | split.sample-sharding.threshold | Int | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strat [...] | split.inverse-sampling.rate | Int | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is pref [...] | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. [...] +| split.string_split_mode | String | No | sample | Supports different string splitting algorithms. By default, `sample` is used to determine the split by sampling the string value. You can switch to `charset_based` to enable charset-based string splitting algorithm. When set to `charset_based`, the algorithm assumes characters of partition_column are within ASCII range 32-126, which covers most character-based splitting scenarios. [...] +| split.string_split_mode_collate | String | No | - | Specifies the collation to use when string_split_mode is set to `charset_based` and the table has a special collation. If not specified, the database's default collation will be used. [...] ### decimal_type_narrowing diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index 976650456b..ce1ac866cb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -22,8 +22,8 @@ import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SchemaSaveMode; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode; -import java.math.BigDecimal; import java.util.List; import java.util.Map; @@ -171,14 +171,14 @@ public interface JdbcOptions { .noDefaultValue() .withDescription("partition column"); - Option<BigDecimal> PARTITION_UPPER_BOUND = + Option<String> PARTITION_UPPER_BOUND = Options.key("partition_upper_bound") - .bigDecimalType() + .stringType() .noDefaultValue() .withDescription("partition upper bound"); - Option<BigDecimal> PARTITION_LOWER_BOUND = + Option<String> PARTITION_LOWER_BOUND = Options.key("partition_lower_bound") - .bigDecimalType() + .stringType() .noDefaultValue() .withDescription("partition lower bound"); Option<Integer> PARTITION_NUM = @@ -225,4 +225,18 @@ public interface JdbcOptions { .mapType() .noDefaultValue() .withDescription("additional connection configuration parameters"); + + Option<StringSplitMode> STRING_SPLIT_MODE = + Options.key("split.string_split_mode") + .enumType(StringSplitMode.class) + .defaultValue(StringSplitMode.SAMPLE) + .withDescription( + "Supports different string splitting algorithms. By default, `sample` is used to determine the split by sampling the string value. You can switch to `charset_based` to enable charset-based string splitting algorithm. When set to `charset_based`, the algorithm assumes characters of partition_column are within ASCII range 32-126, which covers most character-based splitting scenarios."); + + Option<String> STRING_SPLIT_MODE_COLLATE = + Options.key("split.string_split_mode_collate") + .stringType() + .noDefaultValue() + .withDescription( + "Specifies the collation to use when string_split_mode is set to `charset_based` and the table has a special collation. If not specified, the database's default collation will be used."); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java index 09cc92d70e..80d479d610 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.StringSplitMode; import lombok.Builder; import lombok.Data; @@ -44,6 +45,10 @@ public class JdbcSourceConfig implements Serializable { private int splitInverseSamplingRate; private boolean decimalTypeNarrowing; + private StringSplitMode stringSplitMode; + + private String stringSplitModeCollate; + public static JdbcSourceConfig of(ReadonlyConfig config) { JdbcSourceConfig.Builder builder = JdbcSourceConfig.builder(); builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config)); @@ -55,7 +60,8 @@ public class JdbcSourceConfig implements Serializable { config.getOptional(JdbcOptions.QUERY).isPresent() && config.getOptional(JdbcOptions.PARTITION_COLUMN).isPresent(); builder.useDynamicSplitter(!isOldVersion); - + builder.stringSplitMode(config.get(JdbcOptions.STRING_SPLIT_MODE)); + builder.stringSplitModeCollate(config.get(JdbcOptions.STRING_SPLIT_MODE_COLLATE)); builder.splitSize(config.get(JdbcSourceOptions.SPLIT_SIZE)); builder.splitEvenDistributionFactorUpperBound( config.get(JdbcSourceOptions.SPLIT_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND)); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java index a3522e9c14..17c8d1790c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java @@ -27,7 +27,6 @@ import lombok.Data; import lombok.experimental.Tolerate; import java.io.Serializable; -import java.math.BigDecimal; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -53,10 +52,10 @@ public class JdbcSourceTableConfig implements Serializable { private Integer partitionNumber; @JsonProperty("partition_lower_bound") - private BigDecimal partitionStart; + private String partitionStart; @JsonProperty("partition_upper_bound") - private BigDecimal partitionEnd; + private String partitionEnd; @JsonProperty("use_select_count") private Boolean useSelectCount; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 07fe42c07b..6ec44d92f8 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -814,4 +814,44 @@ public interface JdbcDialect extends Serializable { default String quotesDefaultValue(Object defaultValue) { return "'" + defaultValue + "'"; } + + default String getCollationSequence(Connection connection, String collate) { + StringBuilder sb = new StringBuilder(); + String getDual = dualTable(); + String baseQuery = "SELECT char_val FROM ("; + StringBuilder unionQuery = new StringBuilder(); + for (int i = 32; i <= 126; i++) { + if (i > 32) unionQuery.append(" UNION ALL "); + unionQuery.append("SELECT ? AS char_val ").append(getDual); + } + String sortedQuery = + baseQuery + unionQuery + ") ndi_tmp_chars ORDER BY " + getCollateSql(collate); + log.info("sortedCollationQuery is " + sortedQuery); + PreparedStatement preparedStatement; + try { + preparedStatement = connection.prepareStatement(sortedQuery); + for (int i = 32; i <= 126; i++) { + log.debug("setString " + (i - 32) + " => " + (char) i); + preparedStatement.setString(i - 32 + 1, String.valueOf((char) i)); + } + + ResultSet resultSet = preparedStatement.executeQuery(); + while (resultSet.next()) { + sb.append(resultSet.getString("char_val")); + } + return sb.toString(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + default String getCollateSql(String collate) { + String getCollate = + StringUtils.isNotBlank(collate) ? "char_val COLLATE " + collate : "char_val"; + return getCollate; + } + + default String dualTable() { + return ""; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java index 5af57bf104..48f1f29875 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2Dialect.java @@ -98,4 +98,9 @@ public class DB2Dialect implements JdbcDialect { return Optional.of(mergeStatement); } + + @Override + public String dualTable() { + return " FROM SYSIBM.SYSDUMMY1 "; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index 683de83e38..cc49ce6327 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -482,4 +482,24 @@ public class OracleDialect implements JdbcDialect { return rs.getString("NULLABLE").equals("Y"); } } + + @Override + public String dualTable() { + return " FROM dual "; + } + + @Override + public String getCollateSql(String collate) { + if (StringUtils.isNotBlank(collate)) { + StringBuilder sql = new StringBuilder(); + sql.append("NLSSORT(") + .append("char_val") + .append(", 'NLS_SORT=") + .append(collate) + .append("')"); + return sql.toString(); + } else { + return "char_val"; + } + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java index ccb889bf14..ac970d1564 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialect.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import org.apache.commons.lang3.StringUtils; + import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -107,4 +109,22 @@ public class VerticaDialect implements JdbcDialect { return Optional.of(upsertSQL); } + + /** + * <a + * href="https://docs.vertica.com/23.4.x/en/sql-reference/functions/data-type-specific-functions/string-functions/collation/">vertica-collation</a> + * + * @param collate + * @return + */ + @Override + public String getCollateSql(String collate) { + if (StringUtils.isNotBlank(collate)) { + StringBuilder sql = new StringBuilder(); + sql.append("COLLATION(").append("char_val").append(", '").append(collate).append("')"); + return sql.toString(); + } else { + return "char_val"; + } + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java index f4da0a8d94..c41f275f6a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java @@ -85,6 +85,16 @@ public abstract class ChunkSplitter implements AutoCloseable, Serializable { } } + protected static String filterOutUppercase(String str) { + StringBuilder sb = new StringBuilder(); + for (char c : str.toCharArray()) { + if (!Character.isUpperCase(c)) { + sb.append(c); + } + } + return sb.toString(); + } + public Collection<JdbcSourceSplit> generateSplits(JdbcSourceTable table) throws Exception { log.info("Start splitting table {} into chunks...", table.getTablePath()); long start = System.currentTimeMillis(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CollationBasedSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CollationBasedSplitter.java new file mode 100644 index 0000000000..eb98a28a4c --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CollationBasedSplitter.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.source; + +import lombok.extern.slf4j.Slf4j; + +import java.math.BigInteger; +import java.util.Arrays; + +@Slf4j +public class CollationBasedSplitter { + + public static BigInteger encodeStringToNumericRange( + String str, + int maxLength, + boolean paddingAtEnd, + boolean isCaseInsensitive, + String orderedCharset, + int radix) { + log.info( + "Converting string '{}' to BigInteger, maxLength={}, isCaseInsensitive={}", + str, + maxLength, + isCaseInsensitive); + String asciiString = + stringToAsciiString( + str, maxLength, paddingAtEnd, isCaseInsensitive, orderedCharset); + log.info("String converted to ASCII representation: {}", asciiString); + int[] baseArray = parseBaseNumber(asciiString); + log.info("ASCII representation parsed to base array: {}", Arrays.toString(baseArray)); + BigInteger result = toDecimal(baseArray, radix); + log.info("Final BigInteger result: {}", result); + return result; + } + + public static String decodeNumericRangeToString( + String bigInteger, int maxLength, int radix, String orderedCharset) { + log.info( + "Converting BigInteger '{}' to string, maxLength={}, radix={}", + bigInteger, + maxLength, + radix); + int[] baseArray = fromDecimal(new BigInteger(bigInteger), maxLength, radix); + log.info("BigInteger converted to base array: {}", Arrays.toString(baseArray)); + String formattedNumber = formatBaseNumber(baseArray); + log.info("Base array formatted as number string: {}", formattedNumber); + String result = convertToAsciiString(formattedNumber, orderedCharset); + log.info("Final string result: '{}'", result); + return result; + } + + private static int[] parseBaseNumber(String numberStr) { + log.trace("Parsing base number from string: {}", numberStr); + String[] parts = numberStr.split(" "); + int[] result = new int[parts.length]; + for (int i = 0; i < parts.length; i++) { + result[i] = Integer.parseInt(parts[i]); + } + log.trace("Parsed base number result: {}", Arrays.toString(result)); + return result; + } + + private static String formatBaseNumber(int[] number) { + log.trace("Formatting base number array: {}", Arrays.toString(number)); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < number.length; i++) { + if (i > 0) sb.append(" "); + sb.append(String.format("%03d", number[i])); + } + String result = sb.toString(); + log.trace("Formatted base number: {}", result); + return result; + } + + private static int charToIndex(char c, String supportedChars) { + int result = (c == '\u0000') ? 0 : supportedChars.indexOf(c) + 1; + log.trace("Char '{}' converted to index: {}", c, result); + return result; + } + + private static char indexToChar(int index, String supportedChars) { + char result = (index == 0) ? '\u0001' : supportedChars.charAt(index - 1); + log.trace("Index {} converted to char: '{}'", index, result); + return result; + } + + private static BigInteger toDecimal(int[] array, int radix) { + log.trace( + "Converting array {} to decimal with charset size {}", + Arrays.toString(array), + radix); + BigInteger result = BigInteger.ZERO; + for (int i = 0; i < array.length; i++) { + BigInteger value = BigInteger.valueOf(array[i]); + BigInteger multiplier = BigInteger.valueOf(radix).pow(array.length - 1 - i); + result = result.add(value.multiply(multiplier)); + } + log.trace("Decimal conversion result: {}", result); + return result; + } + + private static int[] fromDecimal(BigInteger decimal, int length, int base) { + log.trace("Converting decimal {} to base {} array of length {}", decimal, base, length); + int[] result = new int[length]; + BigInteger remainder = decimal; + for (int i = length - 1; i >= 0; i--) { + BigInteger divisor = BigInteger.valueOf(base).pow(i); + int value = remainder.divide(divisor).intValue(); + remainder = remainder.mod(divisor); + result[length - 1 - i] = value; + } + log.trace("Base conversion result: {}", Arrays.toString(result)); + return result; + } + + private static String stringToAsciiString( + String s, + int expectedLength, + boolean paddingAtEnd, + boolean isCaseInsensitive, + String supportedChars) { + log.trace( + "Converting string '{}' to ASCII string, expectedLength={}, paddingAtEnd={}, isCaseInsensitive={}", + s, + expectedLength, + paddingAtEnd, + isCaseInsensitive); + String str = isCaseInsensitive ? s.toLowerCase() : s; + char[] paddedChars = new char[expectedLength]; + + if (paddingAtEnd) { + for (int i = 0; i < expectedLength; i++) { + if (i < str.length()) { + paddedChars[i] = str.charAt(i); + } else { + paddedChars[i] = '\u0000'; + } + } + log.trace("Applied suffix padding to string"); + } else { + int offset = expectedLength - str.length(); + for (int i = 0; i < expectedLength; i++) { + if (i < offset) { + paddedChars[i] = '\u0000'; + } else { + paddedChars[i] = str.charAt(i - offset); + } + } + log.trace("Applied prefix padding to string"); + } + + StringBuilder result = new StringBuilder(); + for (int i = 0; i < paddedChars.length; i++) { + if (i > 0) result.append(" "); + result.append(String.format("%03d", charToIndex(paddedChars[i], supportedChars))); + } + String asciiResult = result.toString(); + log.trace("ASCII string conversion result: {}", asciiResult); + return asciiResult; + } + + private static String convertToAsciiString(String input, String supportedChars) { + log.trace("Converting ASCII representation '{}' back to string", input); + String[] asciiValues = input.split(" "); + StringBuilder result = new StringBuilder(); + + for (String value : asciiValues) { + char c = indexToChar(Integer.parseInt(value), supportedChars); + result.append(c); + } + + String resultString = result.toString(); + if (resultString.replaceAll("\u0001", "").isEmpty()) { + log.trace("Detected all placeholder characters, returning empty string"); + return ""; + } else { + log.trace("ASCII to string conversion result: '{}'", resultString); + return resultString; + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java index d958c405df..e2bac89780 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java @@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j; import java.io.Serializable; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -56,6 +57,9 @@ import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch @Slf4j public class DynamicChunkSplitter extends ChunkSplitter { + private final boolean useCharsetBasedStringSplitter = + StringSplitMode.CHARSET_BASED.equals(config.getStringSplitMode()); + public DynamicChunkSplitter(JdbcSourceConfig config) { super(config); } @@ -124,8 +128,14 @@ public class DynamicChunkSplitter extends ChunkSplitter { case DECIMAL: case DOUBLE: case FLOAT: - case STRING: return evenlyColumnSplitChunks(table, splitColumnName, min, max, chunkSize); + case STRING: + if (useCharsetBasedStringSplitter) { + return charsetBasedColumnSplitChunks( + table, splitColumnName, min, max, chunkSize); + } else { + return evenlyColumnSplitChunks(table, splitColumnName, min, max, chunkSize); + } case DATE: return dateColumnSplitChunks(table, splitColumnName, min, max, chunkSize); default: @@ -134,6 +144,94 @@ public class DynamicChunkSplitter extends ChunkSplitter { } } + private List<ChunkRange> charsetBasedColumnSplitChunks( + JdbcSourceTable table, + String splitColumnName, + Object objectMin, + Object objectMax, + int chunkSize) + throws Exception { + boolean paddingAtEnd = true; + boolean isCaseInsensitive = false; + String collationSequence = + jdbcDialect.getCollationSequence( + getOrEstablishConnection(), config.getStringSplitModeCollate()); + if (collationSequence.matches(".*[aA][Aa].*")) { + isCaseInsensitive = true; + collationSequence = filterOutUppercase(collationSequence); + } + int radix = collationSequence.length() + 1; + String minStr = objectMin.toString(); + String maxStr = objectMax.toString(); + int maxLength = Math.max(minStr.length(), maxStr.length()); + BigInteger min = + CollationBasedSplitter.encodeStringToNumericRange( + minStr, + maxLength, + paddingAtEnd, + isCaseInsensitive, + collationSequence, + radix); + BigInteger max = + CollationBasedSplitter.encodeStringToNumericRange( + maxStr, + maxLength, + paddingAtEnd, + isCaseInsensitive, + collationSequence, + radix); + TablePath tablePath = table.getTablePath(); + double distributionFactorUpper = config.getSplitEvenDistributionFactorUpperBound(); + double distributionFactorLower = config.getSplitEvenDistributionFactorLowerBound(); + int sampleShardingThreshold = config.getSplitSampleShardingThreshold(); + log.info( + "Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, " + + "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}", + tablePath, + splitColumnName, + min, + max, + chunkSize, + distributionFactorUpper, + distributionFactorLower, + sampleShardingThreshold); + + long approximateRowCnt = queryApproximateRowCnt(table); + + double distributionFactor = + calculateDistributionFactor(tablePath, min, max, approximateRowCnt); + + boolean dataIsEvenlyDistributed = + ObjectUtils.doubleCompare(distributionFactor, distributionFactorLower) >= 0 + && ObjectUtils.doubleCompare(distributionFactor, distributionFactorUpper) + <= 0; + + if (dataIsEvenlyDistributed) { + // the minimum dynamic chunk size is at least 1 + final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1); + return splitStringEvenlySizedChunks( + tablePath, + min, + max, + approximateRowCnt, + chunkSize, + dynamicChunkSize, + maxLength, + radix, + collationSequence); + } else { + return getChunkRangesWithUnevenlyData( + table, + splitColumnName, + min, + max, + chunkSize, + tablePath, + sampleShardingThreshold, + approximateRowCnt); + } + } + private List<ChunkRange> evenlyColumnSplitChunks( JdbcSourceTable table, String splitColumnName, Object min, Object max, int chunkSize) throws Exception { @@ -169,42 +267,63 @@ public class DynamicChunkSplitter extends ChunkSplitter { return splitEvenlySizedChunks( tablePath, min, max, approximateRowCnt, chunkSize, dynamicChunkSize); } else { - int shardCount = (int) (approximateRowCnt / chunkSize); - int inverseSamplingRate = config.getSplitInverseSamplingRate(); - if (sampleShardingThreshold < shardCount) { - // It is necessary to ensure that the number of data rows sampled by the - // sampling rate is greater than the number of shards. - // Otherwise, if the sampling rate is too low, it may result in an insufficient - // number of data rows for the shards, leading to an inadequate number of - // shards. - // Therefore, inverseSamplingRate should be less than chunkSize - if (inverseSamplingRate > chunkSize) { - log.warn( - "The inverseSamplingRate is {}, which is greater than chunkSize {}, so we set inverseSamplingRate to chunkSize", - inverseSamplingRate, - chunkSize); - inverseSamplingRate = chunkSize; - } - log.info( - "Use sampling sharding for table {}, the sampling rate is {}", - tablePath, - inverseSamplingRate); - Object[] sample = - jdbcDialect.sampleDataFromColumn( - getOrEstablishConnection(), - table, - splitColumnName, - inverseSamplingRate, - config.getFetchSize()); - log.info( - "Sample data from table {} end, the sample size is {}", - tablePath, - sample.length); - return efficientShardingThroughSampling( - tablePath, sample, approximateRowCnt, shardCount); + return getChunkRangesWithUnevenlyData( + table, + splitColumnName, + min, + max, + chunkSize, + tablePath, + sampleShardingThreshold, + approximateRowCnt); + } + } + + private List<ChunkRange> getChunkRangesWithUnevenlyData( + JdbcSourceTable table, + String splitColumnName, + Object min, + Object max, + int chunkSize, + TablePath tablePath, + int sampleShardingThreshold, + long approximateRowCnt) + throws Exception { + int shardCount = (int) (approximateRowCnt / chunkSize); + int inverseSamplingRate = config.getSplitInverseSamplingRate(); + if (sampleShardingThreshold < shardCount) { + // It is necessary to ensure that the number of data rows sampled by the + // sampling rate is greater than the number of shards. + // Otherwise, if the sampling rate is too low, it may result in an insufficient + // number of data rows for the shards, leading to an inadequate number of + // shards. + // Therefore, inverseSamplingRate should be less than chunkSize + if (inverseSamplingRate > chunkSize) { + log.warn( + "The inverseSamplingRate is {}, which is greater than chunkSize {}, so we set inverseSamplingRate to chunkSize", + inverseSamplingRate, + chunkSize); + inverseSamplingRate = chunkSize; } - return splitUnevenlySizedChunks(table, splitColumnName, min, max, chunkSize); + log.info( + "Use sampling sharding for table {}, the sampling rate is {}", + tablePath, + inverseSamplingRate); + Object[] sample = + jdbcDialect.sampleDataFromColumn( + getOrEstablishConnection(), + table, + splitColumnName, + inverseSamplingRate, + config.getFetchSize()); + log.info( + "Sample data from table {} end, the sample size is {}", + tablePath, + sample.length); + return efficientShardingThroughSampling( + tablePath, sample, approximateRowCnt, shardCount); } + return splitUnevenlySizedChunks(table, splitColumnName, min, max, chunkSize); } private Long queryApproximateRowCnt(JdbcSourceTable table) throws SQLException { @@ -238,6 +357,68 @@ public class DynamicChunkSplitter extends ChunkSplitter { return distributionFactor; } + private List<ChunkRange> splitStringEvenlySizedChunks( + TablePath tablePath, + Object min, + Object max, + long approximateRowCnt, + int chunkSize, + int dynamicChunkSize, + int maxLength, + int radix, + String collationSequence) { + log.info( + "Use evenly-sized chunk optimization for table {}, the approximate row count is {}, the chunk size is {}, the dynamic chunk size is {}", + tablePath, + approximateRowCnt, + chunkSize, + dynamicChunkSize); + if (approximateRowCnt <= chunkSize) { + // there is no more than one chunk, return full table as a chunk + return Collections.singletonList(ChunkRange.all()); + } + + final List<ChunkRange> splits = new ArrayList<>(); + Object chunkStart = null; + Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); + while (ObjectUtils.compare(chunkEnd, max) <= 0) { + splits.add( + ChunkRange.of( + chunkStart == null + ? null + : CollationBasedSplitter.decodeNumericRangeToString( + chunkStart.toString(), + maxLength, + radix, + collationSequence), + chunkEnd == null + ? null + : CollationBasedSplitter.decodeNumericRangeToString( + chunkEnd.toString(), + maxLength, + radix, + collationSequence))); + chunkStart = chunkEnd; + try { + chunkEnd = ObjectUtils.plus(chunkEnd, dynamicChunkSize); + } catch (ArithmeticException e) { + // Stop chunk split to avoid dead loop when number overflows. + break; + } + } + // add the ending split + if (chunkStart != null) { + splits.add( + ChunkRange.of( + CollationBasedSplitter.decodeNumericRangeToString( + chunkStart.toString(), maxLength, radix, collationSequence), + null)); + } else { + splits.add(ChunkRange.of(null, null)); + } + return splits; + } + private List<ChunkRange> splitEvenlySizedChunks( TablePath tablePath, Object min, diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java index 72a4e061ac..1509bdfc2b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; @@ -50,6 +51,9 @@ import java.util.List; @Slf4j public class FixedChunkSplitter extends ChunkSplitter { + private final boolean useCharsetBasedStringSplitter = + StringSplitMode.CHARSET_BASED.equals(config.getStringSplitMode()); + public FixedChunkSplitter(JdbcSourceConfig config) { super(config); } @@ -71,11 +75,107 @@ public class FixedChunkSplitter extends ChunkSplitter { } } if (SqlType.STRING.equals(splitKeyType.getSqlType())) { - return createStringColumnSplits(table, splitKeyName, splitKeyType); + log.info("useNewStringSplitter is {}", useCharsetBasedStringSplitter); + if (useCharsetBasedStringSplitter) { + return getJdbcSourceStringSplits(table, splitKeyName, splitKeyType); + } else { + return createStringColumnSplits(table, splitKeyName, splitKeyType); + } } + return getJdbcSourceSplits(table, splitKeyName, splitKeyType); + } - BigDecimal partitionStart = table.getPartitionStart(); - BigDecimal partitionEnd = table.getPartitionEnd(); + private Collection<JdbcSourceSplit> getJdbcSourceStringSplits( + JdbcSourceTable table, String splitKeyName, SeaTunnelDataType splitKeyType) + throws SQLException { + String partitionStart = table.getPartitionStart(); + String partitionEnd = table.getPartitionEnd(); + if (partitionStart == null || partitionEnd == null) { + Pair<String, String> range = findSplitStringColumnRange(table, splitKeyName); + partitionStart = range.getLeft(); + partitionEnd = range.getRight(); + } + if (partitionStart == null || partitionEnd == null) { + JdbcSourceSplit split = createSingleSplit(table); + return Collections.singletonList(split); + } + boolean paddingAtEnd = true; + boolean isCaseInsensitive = false; + String collationSequence = + jdbcDialect.getCollationSequence( + getOrEstablishConnection(), config.getStringSplitModeCollate()); + if (collationSequence.matches(".*[aA][Aa].*")) { + isCaseInsensitive = true; + collationSequence = filterOutUppercase(collationSequence); + } + int radix = collationSequence.length() + 1; + int maxLength = Math.max(partitionStart.length(), partitionEnd.length()); + BigInteger min = + CollationBasedSplitter.encodeStringToNumericRange( + partitionStart, + maxLength, + paddingAtEnd, + isCaseInsensitive, + collationSequence, + radix); + BigInteger max = + CollationBasedSplitter.encodeStringToNumericRange( + partitionEnd, + maxLength, + paddingAtEnd, + isCaseInsensitive, + collationSequence, + radix); + Collection<JdbcSourceSplit> numberColumnSplits = + createNumberColumnSplits( + table, + splitKeyName, + splitKeyType, + new BigDecimal(min), + new BigDecimal(max)); + if (CollectionUtils.isNotEmpty(numberColumnSplits)) { + List<JdbcSourceSplit> result = new ArrayList<>(); + int index = 0; + for (JdbcSourceSplit jdbcSourceSplit : numberColumnSplits) { + result.add( + new JdbcSourceSplit( + jdbcSourceSplit.getTablePath(), + jdbcSourceSplit.getSplitId(), + jdbcSourceSplit.getSplitQuery(), + jdbcSourceSplit.getSplitKeyName(), + jdbcSourceSplit.getSplitKeyType(), + index == 0 + ? partitionStart + : CollationBasedSplitter.decodeNumericRangeToString( + jdbcSourceSplit.getSplitStart().toString(), + maxLength, + radix, + collationSequence), + index == numberColumnSplits.size() - 1 + ? partitionEnd + : CollationBasedSplitter.decodeNumericRangeToString( + jdbcSourceSplit.getSplitEnd().toString(), + maxLength, + radix, + collationSequence))); + index++; + } + return result; + } + return numberColumnSplits; + } + + private Collection<JdbcSourceSplit> getJdbcSourceSplits( + JdbcSourceTable table, String splitKeyName, SeaTunnelDataType splitKeyType) + throws SQLException { + BigDecimal partitionStart = + StringUtils.isBlank(table.getPartitionStart()) + ? null + : new BigDecimal(table.getPartitionStart()); + BigDecimal partitionEnd = + StringUtils.isBlank(table.getPartitionEnd()) + ? null + : new BigDecimal(table.getPartitionEnd()); if (partitionStart == null || partitionEnd == null) { Pair<BigDecimal, BigDecimal> range = findSplitColumnRange(table, splitKeyName); partitionStart = range.getLeft(); @@ -93,7 +193,8 @@ public class FixedChunkSplitter extends ChunkSplitter { @Override protected PreparedStatement createSplitStatement(JdbcSourceSplit split, TableSchema schema) throws SQLException { - if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType())) { + if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType()) + && !useCharsetBasedStringSplitter) { return createStringColumnSplitStatement(split); } if (split.getSplitStart() == null && split.getSplitEnd() == null) { @@ -244,6 +345,20 @@ public class FixedChunkSplitter extends ChunkSplitter { return statement; } + private Pair<String, String> findSplitStringColumnRange( + JdbcSourceTable table, String columnName) throws SQLException { + Pair<Object, Object> splitColumnRange = queryMinMax(table, columnName); + Object min = splitColumnRange.getLeft(); + Object max = splitColumnRange.getRight(); + if (min != null) { + min = min.toString(); + } + if (max != null) { + max = max.toString(); + } + return Pair.of(((String) min), ((String) max)); + } + private Pair<BigDecimal, BigDecimal> findSplitColumnRange( JdbcSourceTable table, String columnName) throws SQLException { Pair<Object, Object> splitColumnRange = queryMinMax(table, columnName); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java index 8aad94c8b6..7144c9ed2d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java @@ -24,7 +24,6 @@ import lombok.Builder; import lombok.Data; import java.io.Serializable; -import java.math.BigDecimal; @Data @Builder @@ -35,8 +34,8 @@ public class JdbcSourceTable implements Serializable { private final String query; private final String partitionColumn; private final Integer partitionNumber; - private final BigDecimal partitionStart; - private final BigDecimal partitionEnd; + private final String partitionStart; + private final String partitionEnd; private final Boolean useSelectCount; private final Boolean skipAnalyze; private final CatalogTable catalogTable; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/StringSplitMode.java similarity index 55% copy from seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/StringSplitMode.java index 8aad94c8b6..e020394085 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceTable.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/StringSplitMode.java @@ -17,27 +17,27 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.TablePath; - -import lombok.Builder; -import lombok.Data; - -import java.io.Serializable; -import java.math.BigDecimal; - -@Data -@Builder -public class JdbcSourceTable implements Serializable { - private static final long serialVersionUID = 1L; - - private final TablePath tablePath; - private final String query; - private final String partitionColumn; - private final Integer partitionNumber; - private final BigDecimal partitionStart; - private final BigDecimal partitionEnd; - private final Boolean useSelectCount; - private final Boolean skipAnalyze; - private final CatalogTable catalogTable; +public enum StringSplitMode { + SAMPLE("sample"), + + CHARSET_BASED("charset_based"); + + public boolean equals(String mode) { + return this.mode.equalsIgnoreCase(mode); + } + + private final String mode; + + StringSplitMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } + + @Override + public String toString() { + return mode; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CharsetBasedSplitterTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CharsetBasedSplitterTest.java new file mode 100644 index 0000000000..c877e30827 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/CharsetBasedSplitterTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.source; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import lombok.extern.slf4j.Slf4j; + +import java.math.BigInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Slf4j +public class CharsetBasedSplitterTest { + + private static final String DEFAULT_CHARSET = "0123456789abcdefghijklmnopqrstuvwxyz"; + + @Test + @DisplayName("Test encoding of minimum and maximum values") + public void testMinMax() { + String minStr = "00000"; + String maxStr = "1"; + int maxLen = Math.max(minStr.length(), maxStr.length()); + String orderedCharset = "012a34b56789"; + BigInteger minBigInt = + CollationBasedSplitter.encodeStringToNumericRange( + minStr, maxLen, true, true, orderedCharset, orderedCharset.length() + 1); + log.info("Minimum value encoding: " + minBigInt); + + BigInteger maxBigInt = + CollationBasedSplitter.encodeStringToNumericRange( + maxStr, maxLen, true, true, orderedCharset, orderedCharset.length() + 1); + log.info("Maximum value encoding: " + maxBigInt); + + assert maxBigInt.compareTo(minBigInt) > 0; + } + + @Test + @DisplayName("Test consistency of string encoding and decoding") + public void testEncodeDecode() { + String original = "abc123"; + int maxLength = 10; + boolean paddingAtEnd = true; + boolean isCaseInsensitive = true; + int radix = DEFAULT_CHARSET.length() + 1; + + BigInteger encoded = + CollationBasedSplitter.encodeStringToNumericRange( + original, + maxLength, + paddingAtEnd, + isCaseInsensitive, + DEFAULT_CHARSET, + radix); + + String decoded = + CollationBasedSplitter.decodeNumericRangeToString( + encoded.toString(), maxLength, radix, DEFAULT_CHARSET); + + assertEquals(original.toLowerCase(), decoded.trim()); + } + + @Test + @DisplayName("Test charset with special characters") + public void testSpecialCharset() { + String customCharset = "!@#$%^&*()_+-=[]{}|;:,.<>?"; + String input = "!@#$%"; + int maxLength = 10; + int radix = customCharset.length() + 1; + + BigInteger encoded = + CollationBasedSplitter.encodeStringToNumericRange( + input, maxLength, true, false, customCharset, radix); + + String decoded = + CollationBasedSplitter.decodeNumericRangeToString( + encoded.toString(), maxLength, radix, customCharset); + + assertEquals(input, decoded.trim()); + } + + @Test + @DisplayName("Test impact of different padding positions") + public void testPaddingPosition() { + String input = "xyz"; + int maxLength = 5; + int radix = DEFAULT_CHARSET.length() + 1; + + BigInteger encodedPrefix = + CollationBasedSplitter.encodeStringToNumericRange( + input, maxLength, false, false, DEFAULT_CHARSET, radix); + String decodedPrefix = + CollationBasedSplitter.decodeNumericRangeToString( + encodedPrefix.toString(), maxLength, radix, DEFAULT_CHARSET); + + BigInteger encodedSuffix = + CollationBasedSplitter.encodeStringToNumericRange( + input, maxLength, true, false, DEFAULT_CHARSET, radix); + String decodedSuffix = + CollationBasedSplitter.decodeNumericRangeToString( + encodedSuffix.toString(), maxLength, radix, DEFAULT_CHARSET); + + assertEquals(input, decodedPrefix.trim()); + assertEquals(input, decodedSuffix.trim()); + + assert !encodedPrefix.equals(encodedSuffix); + } + + @Test + @DisplayName("Test performance") + public void testPerformance() { + int iterations = 1000; + String input = "abcdefghijklmnopqrstuvwxyz"; + int maxLength = 30; + int radix = DEFAULT_CHARSET.length() + 1; + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < iterations; i++) { + BigInteger encoded = + CollationBasedSplitter.encodeStringToNumericRange( + input, maxLength, true, true, DEFAULT_CHARSET, radix); + + String decoded = + CollationBasedSplitter.decodeNumericRangeToString( + encoded.toString(), maxLength, radix, DEFAULT_CHARSET); + + assertEquals(input, decoded.trim()); + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + log.info( + "Executing " + + iterations + + " encoding/decoding operations took: " + + duration + + " milliseconds"); + log.info("Average time per operation: " + (double) duration / iterations + " milliseconds"); + } + + @Test + @DisplayName("Test encoding and decoding of random strings") + public void testRandomStrings() { + java.util.Random random = new java.util.Random(); + int testCount = 10; + int maxLength = 20; + int radix = DEFAULT_CHARSET.length() + 1; + for (int test = 0; test < testCount; test++) { + int length = random.nextInt(maxLength) + 1; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) { + int charIndex = random.nextInt(DEFAULT_CHARSET.length()); + sb.append(DEFAULT_CHARSET.charAt(charIndex)); + } + String randomString = sb.toString(); + BigInteger encoded = + CollationBasedSplitter.encodeStringToNumericRange( + randomString, maxLength, true, false, DEFAULT_CHARSET, radix); + + String decoded = + CollationBasedSplitter.decodeNumericRangeToString( + encoded.toString(), maxLength, radix, DEFAULT_CHARSET); + + log.info("Random string #" + test + ": " + randomString); + log.info("Encoding result: " + encoded); + log.info("Decoding result: " + decoded.trim()); + + assertEquals(randomString, decoded.trim()); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java index 428ace282b..11354ad3b5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGreenplumIT.java @@ -85,6 +85,7 @@ public class JdbcGreenplumIT extends AbstractJdbcIT { .insertSql(insertSql) .testData(testDataSet) .tablePathFullName(GREENPLUM_SOURCE) + .useSaveModeCreateTable(false) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_greenplum_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_greenplum_source_and_sink.conf index 2f5314e67a..bfb5304157 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_greenplum_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_greenplum_source_and_sink.conf @@ -31,6 +31,8 @@ source { user = tester password = pivotal query = "select age, name from source" + partition_column = "name" + split.string_split_mode = charset_based } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java index 6dcc6e51ba..21567ac818 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlSplitIT.java @@ -30,6 +30,7 @@ import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.source.DynamicChunkSplitter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.FixedChunkSplitter; import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit; import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; import org.apache.seatunnel.e2e.common.TestResource; @@ -538,6 +539,13 @@ public class JdbcMysqlSplitIT extends TestSuiteBase implements TestResource { return splitter; } + @NotNull private FixedChunkSplitter getFixedChunkSplitter(Map<String, Object> configMap) { + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(configMap); + JdbcSourceConfig sourceConfig = JdbcSourceConfig.of(readonlyConfig); + FixedChunkSplitter splitter = new FixedChunkSplitter(sourceConfig); + return splitter; + } + @Override public void tearDown() throws Exception { if (mysql_container != null) { @@ -545,4 +553,155 @@ public class JdbcMysqlSplitIT extends TestSuiteBase implements TestResource { dockerClient.removeContainerCmd(mysql_container.getContainerId()).exec(); } } + + @Test + public void testDynamicCharSplit() { + Map<String, Object> configMap = new HashMap<>(); + configMap.put("url", mysqlUrlInfo.getUrlWithDatabase().get()); + configMap.put("driver", "com.mysql.cj.jdbc.Driver"); + configMap.put("user", MYSQL_USERNAME); + configMap.put("password", MYSQL_PASSWORD); + configMap.put("table_path", MYSQL_DATABASE + "." + MYSQL_TABLE); + configMap.put("split.size", "10"); + configMap.put("split.string_split_mode", "charset_based"); + + TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE); + MySqlCatalog mySqlCatalog = + new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, mysqlUrlInfo, null); + mySqlCatalog.open(); + Assertions.assertTrue(mySqlCatalog.tableExists(tablePathMySql)); + CatalogTable table = mySqlCatalog.getTable(tablePathMySql); + + String[] charColumns = { + "c_char", "c_varchar", "c_tinytext", "c_text", "c_mediumtext", "c_longtext" + }; + + for (String charColumn : charColumns) { + try { + LOG.info("Testing split on character column: {}", charColumn); + configMap.put("partition_column", charColumn); + DynamicChunkSplitter splitter = getDynamicChunkSplitter(configMap); + + JdbcSourceTable jdbcSourceTable = + JdbcSourceTable.builder() + .tablePath(TablePath.of(MYSQL_DATABASE, MYSQL_TABLE)) + .catalogTable(table) + .partitionColumn(charColumn) + .build(); + + Collection<JdbcSourceSplit> jdbcSourceSplits = + splitter.generateSplits(jdbcSourceTable); + + LOG.info( + "Split results for column {}: {} splits", + charColumn, + jdbcSourceSplits.size()); + int splitIndex = 0; + for (JdbcSourceSplit split : jdbcSourceSplits) { + LOG.info( + "Split {}: key={}, start={}, end={}", + splitIndex++, + split.getSplitKeyName(), + split.getSplitStart(), + split.getSplitEnd()); + } + + if (!jdbcSourceSplits.isEmpty()) { + JdbcSourceSplit[] splitArray = jdbcSourceSplits.toArray(new JdbcSourceSplit[0]); + Assertions.assertEquals(charColumn, splitArray[0].getSplitKeyName()); + printCharSplitBoundaries(splitArray); + } + } catch (Exception e) { + LOG.error("Error splitting on column {}: {}", charColumn, e.getMessage(), e); + } + } + + mySqlCatalog.close(); + } + + @Test + public void testFixedCharSplit() { + Map<String, Object> configMap = new HashMap<>(); + configMap.put("url", mysqlUrlInfo.getUrlWithDatabase().get()); + configMap.put("driver", "com.mysql.cj.jdbc.Driver"); + configMap.put("user", MYSQL_USERNAME); + configMap.put("password", MYSQL_PASSWORD); + configMap.put("table_path", MYSQL_DATABASE + "." + MYSQL_TABLE); + configMap.put("split.string_split_mode", "charset_based"); + + TablePath tablePathMySql = TablePath.of(MYSQL_DATABASE, MYSQL_TABLE); + MySqlCatalog mySqlCatalog = + new MySqlCatalog("mysql", MYSQL_USERNAME, MYSQL_PASSWORD, mysqlUrlInfo, null); + mySqlCatalog.open(); + Assertions.assertTrue(mySqlCatalog.tableExists(tablePathMySql)); + CatalogTable table = mySqlCatalog.getTable(tablePathMySql); + + String[] charColumns = { + "c_bigint", "c_varchar", "c_tinytext", "c_text", "c_mediumtext", "c_longtext", "c_char" + }; + + for (String charColumn : charColumns) { + try { + LOG.info("Testing split on character column: {}", charColumn); + configMap.put("partition_column", charColumn); + FixedChunkSplitter splitter = getFixedChunkSplitter(configMap); + + JdbcSourceTable jdbcSourceTable = + JdbcSourceTable.builder() + .tablePath(TablePath.of(MYSQL_DATABASE, MYSQL_TABLE)) + .catalogTable(table) + .partitionColumn(charColumn) + .partitionNumber(10) + .build(); + + Collection<JdbcSourceSplit> jdbcSourceSplits = + splitter.generateSplits(jdbcSourceTable); + + LOG.info( + "Split results for column {}: {} splits", + charColumn, + jdbcSourceSplits.size()); + int splitIndex = 0; + for (JdbcSourceSplit split : jdbcSourceSplits) { + LOG.info( + "Split {}: key={}, start={}, end={}", + splitIndex++, + split.getSplitKeyName(), + split.getSplitStart(), + split.getSplitEnd()); + } + } catch (Exception e) { + LOG.error("Error splitting on column {}: {}", charColumn, e.getMessage(), e); + } + } + + mySqlCatalog.close(); + } + + private void printCharSplitBoundaries(JdbcSourceSplit[] splitArray) { + LOG.info("Character column split boundaries:"); + for (int i = 0; i < splitArray.length; i++) { + Object start = splitArray[i].getSplitStart(); + Object end = splitArray[i].getSplitEnd(); + + LOG.info( + "Split {}: start={}, end={}", + i, + start == null ? "NULL" : "'" + start.toString() + "'", + end == null ? "NULL" : "'" + end.toString() + "'"); + + if (i == 0) { + Assertions.assertNull(start, "First split should start with NULL"); + } + + if (i == splitArray.length - 1) { + Assertions.assertNull(end, "Last split should end with NULL"); + } + + if (i > 0 && i < splitArray.length - 1) { + Assertions.assertNotNull(start, "Middle split should have non-null start"); + Assertions.assertNotNull(end, "Middle split should have non-null end"); + } + } + } }