This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d8e1440f30 Support unique key first integer column exact splitting 
(#37517)
8d8e1440f30 is described below

commit 8d8e1440f301be7c42a37b2234beb12260d789e5
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 25 19:19:04 2025 +0800

    Support unique key first integer column exact splitting (#37517)
    
    * Add IntegerPositionHandler
    
    * Extract DataSourceTestUtils
    
    * Add InventoryIntegerPositionExactCalculatorTest
    
    * Update RELEASE-NOTES.md
---
 RELEASE-NOTES.md                                   |   1 +
 .../position/exact/IntegerPositionHandler.java     |  45 ++++++++
 .../exact/InventoryPositionExactCalculator.java    |  10 +-
 .../RecordTableInventoryCheckCalculatorTest.java   |  19 +---
 ...nventoryIntegerPositionExactCalculatorTest.java | 113 +++++++++++++++++++++
 .../pipeline/core/util/DataSourceTestUtils.java    |  44 ++++++++
 6 files changed, 212 insertions(+), 20 deletions(-)

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 50f97b80f92..bbd3f95e7f4 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -66,6 +66,7 @@
 1. Pipeline: InventoryDumper reuse table inventory calculator for better 
function and performance - 
[#36830](https://github.com/apache/shardingsphere/pull/36830)
 1. Pipeline: Improve "alter transmission rule": verify STREAM_CHANNEL TYPE 
NAME - [#36864](https://github.com/apache/shardingsphere/pull/36864)
 1. Pipeline: InventoryDumperContextSplitter supports multi-columns unique key 
first integer column splitting - 
[#36935](https://github.com/apache/shardingsphere/pull/36935)
+1. Pipeline: Support unique key first integer column exact splitting - 
[#37517](https://github.com/apache/shardingsphere/pull/37517)
 1. Encrypt: Support handling show create view result decoration in encrypt - 
[#37299](https://github.com/apache/shardingsphere/pull/37299)
 1. JDBC: Enhance ResultSetUtils to support flexible string date/time 
conversions - [37424](https://github.com/apache/shardingsphere/pull/37424)
 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
new file mode 100644
index 00000000000..787d2cf6bd8
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Integer position handler.
+ */
+public final class IntegerPositionHandler implements 
DataTypePositionHandler<Long> {
+    
+    @Override
+    public Long readColumnValue(final ResultSet resultSet, final int 
columnIndex) throws SQLException {
+        return resultSet.getLong(columnIndex);
+    }
+    
+    @Override
+    public void setPreparedStatementValue(final PreparedStatement 
preparedStatement, final int parameterIndex, final Long value) throws 
SQLException {
+        preparedStatement.setLong(parameterIndex, value);
+    }
+    
+    @Override
+    public IntegerPrimaryKeyIngestPosition createIngestPosition(final Long 
lowerValue, final Long upperValue) {
+        return new IntegerPrimaryKeyIngestPosition(lowerValue, upperValue);
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
index b392abd73e1..0f8712ef837 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -76,12 +76,12 @@ public final class InventoryPositionExactCalculator {
                     return positionHandler.createIngestPosition(null, null);
                 }
                 long count = resultSet.getLong(2);
-                log.info("First records count is 0, return. First query SQL: 
{}", firstQuerySQL);
+                T minValue = positionHandler.readColumnValue(resultSet, 3);
+                T maxValue = positionHandler.readColumnValue(resultSet, 1);
+                log.info("First records count: {}, min value: {}, max value: 
{}, sharding size: {}, first query SQL: {}", count, minValue, maxValue, 
shardingSize, firstQuerySQL);
                 if (0 == count) {
                     return positionHandler.createIngestPosition(null, null);
                 }
-                T minValue = positionHandler.readColumnValue(resultSet, 3);
-                T maxValue = positionHandler.readColumnValue(resultSet, 1);
                 return positionHandler.createIngestPosition(minValue, 
maxValue);
             }
         } catch (final SQLException ex) {
@@ -107,14 +107,14 @@ public final class InventoryPositionExactCalculator {
                     if (!resultSet.next()) {
                         break;
                     }
-                    T maxValue = positionHandler.readColumnValue(resultSet, 1);
                     int count = resultSet.getInt(2);
                     if (0 == count) {
-                        log.info("Done. Later records count: {}, later query 
SQL: {}, last lower value: {}", recordsCount, laterQuerySQL, lowerValue);
+                        log.info("Done. Later records count: {}, last lower 
value: {}, sharding size: {}, later query SQL: {}", recordsCount, lowerValue, 
shardingSize, laterQuerySQL);
                         break;
                     }
                     recordsCount += count;
                     T minValue = positionHandler.readColumnValue(resultSet, 3);
+                    T maxValue = positionHandler.readColumnValue(resultSet, 1);
                     result.add(positionHandler.createIngestPosition(minValue, 
maxValue));
                     lowerValue = maxValue;
                 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
index 27b689b53d2..847ac735a95 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
@@ -17,8 +17,7 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
 
-import com.zaxxer.hikari.HikariDataSource;
-import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.text.RandomStringGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordTableInventoryCheckCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
@@ -27,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.quer
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.util.DataSourceTestUtils;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -58,7 +58,8 @@ class RecordTableInventoryCheckCalculatorTest {
     
     @BeforeAll
     static void setUp() throws Exception {
-        dataSource = new PipelineDataSource(createHikariDataSource("calc_" + 
RandomStringUtils.randomAlphanumeric(9)), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
+        String databaseName = "check_calc_" + 
RandomStringGenerator.builder().withinRange('a', 'z').build().generate(9);
+        dataSource = new 
PipelineDataSource(DataSourceTestUtils.createHikariDataSource(databaseName), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
         createTableAndInitData(dataSource);
     }
     
@@ -67,18 +68,6 @@ class RecordTableInventoryCheckCalculatorTest {
         dataSource.close();
     }
     
-    private static HikariDataSource createHikariDataSource(final String 
databaseName) {
-        HikariDataSource result = new HikariDataSource();
-        
result.setJdbcUrl(String.format("jdbc:h2:mem:%s;DATABASE_TO_UPPER=false;MODE=MySQL",
 databaseName));
-        result.setUsername("root");
-        result.setPassword("root");
-        result.setMaximumPoolSize(10);
-        result.setMinimumIdle(2);
-        result.setConnectionTimeout(15L * 1000L);
-        result.setIdleTimeout(40L * 1000L);
-        return result;
-    }
-    
     private static void createTableAndInitData(final PipelineDataSource 
dataSource) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             String sql = "CREATE TABLE t_order (user_id INT NOT NULL, order_id 
INT, status VARCHAR(12), PRIMARY KEY (user_id, order_id))";
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
new file mode 100644
index 00000000000..f155cd679fe
--- /dev/null
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import org.apache.commons.text.RandomStringGenerator;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.util.DataSourceTestUtils;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class InventoryIntegerPositionExactCalculatorTest {
+    
+    private static PipelineDataSource dataSource;
+    
+    @BeforeAll
+    static void setUp() throws Exception {
+        String databaseName = "pos_i_calc_" + 
RandomStringGenerator.builder().withinRange('a', 'z').build().generate(9);
+        dataSource = new 
PipelineDataSource(DataSourceTestUtils.createHikariDataSource(databaseName), 
TypedSPILoader.getService(DatabaseType.class, "H2"));
+        createTableAndInitData(dataSource);
+    }
+    
+    @AfterAll
+    static void tearDown() {
+        dataSource.close();
+    }
+    
+    private static void createTableAndInitData(final PipelineDataSource 
dataSource) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            String sql = "CREATE TABLE t_order (user_id INT NOT NULL, order_id 
INT, status VARCHAR(12), PRIMARY KEY (user_id, order_id))";
+            connection.createStatement().execute(sql);
+            PreparedStatement preparedStatement = 
connection.prepareStatement("INSERT INTO t_order (user_id, order_id, status) 
VALUES (?, ?, ?)");
+            insertRecord(preparedStatement, 1, 1);
+            insertRecord(preparedStatement, 2, 2);
+            insertRecord(preparedStatement, 3, 3);
+            insertRecord(preparedStatement, 3, 4);
+            insertRecord(preparedStatement, 3, 5);
+            insertRecord(preparedStatement, 3, 6);
+            insertRecord(preparedStatement, 3, 7);
+            insertRecord(preparedStatement, 4, 8);
+            insertRecord(preparedStatement, 5, 9);
+            insertRecord(preparedStatement, 6, 10);
+            insertRecord(preparedStatement, 6, 11);
+        }
+    }
+    
+    private static void insertRecord(final PreparedStatement 
preparedStatement, final int userId, final int orderId) throws SQLException {
+        preparedStatement.setInt(1, userId);
+        preparedStatement.setInt(2, orderId);
+        preparedStatement.setString(3, "OK");
+        preparedStatement.executeUpdate();
+    }
+    
+    @Test
+    void assertGetPositionsWithOrderIdUniqueKey() {
+        List<IngestPosition> actual = 
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null, 
"t_order"), "order_id", 3, dataSource, new IntegerPositionHandler());
+        assertThat(actual.size(), is(4));
+        for (IngestPosition each : actual) {
+            assertThat(each, 
instanceOf(IntegerPrimaryKeyIngestPosition.class));
+        }
+        assertIntegerPrimaryKeyIngestPosition0(actual.get(0), new 
IntegerPrimaryKeyIngestPosition(1L, 3L));
+        assertIntegerPrimaryKeyIngestPosition0(actual.get(1), new 
IntegerPrimaryKeyIngestPosition(4L, 6L));
+        assertIntegerPrimaryKeyIngestPosition0(actual.get(2), new 
IntegerPrimaryKeyIngestPosition(7L, 9L));
+        assertIntegerPrimaryKeyIngestPosition0(actual.get(3), new 
IntegerPrimaryKeyIngestPosition(10L, 11L));
+    }
+    
+    private void assertIntegerPrimaryKeyIngestPosition0(final IngestPosition 
actual, final IntegerPrimaryKeyIngestPosition expected) {
+        IntegerPrimaryKeyIngestPosition position = 
(IntegerPrimaryKeyIngestPosition) actual;
+        assertThat(position.getType(), is(expected.getType()));
+        assertThat(position.getBeginValue(), is(expected.getBeginValue()));
+        assertThat(position.getEndValue(), is(expected.getEndValue()));
+    }
+    
+    @Test
+    void assertGetPositionsWithMultiColumnUniqueKeys() {
+        List<IngestPosition> actual = 
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null, 
"t_order"), "user_id", 3, dataSource, new IntegerPositionHandler());
+        assertThat(actual.size(), is(2));
+        for (IngestPosition each : actual) {
+            assertThat(each, 
instanceOf(IntegerPrimaryKeyIngestPosition.class));
+        }
+        assertIntegerPrimaryKeyIngestPosition0(actual.get(0), new 
IntegerPrimaryKeyIngestPosition(1L, 3L));
+        assertIntegerPrimaryKeyIngestPosition0(actual.get(1), new 
IntegerPrimaryKeyIngestPosition(4L, 6L));
+    }
+}
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataSourceTestUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataSourceTestUtils.java
new file mode 100644
index 00000000000..eb57788e998
--- /dev/null
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataSourceTestUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class DataSourceTestUtils {
+    
+    /**
+     * Create Hikari data source.
+     *
+     * @param databaseName database name
+     * @return Hikari data source
+     */
+    public static HikariDataSource createHikariDataSource(final String 
databaseName) {
+        HikariDataSource result = new HikariDataSource();
+        
result.setJdbcUrl(String.format("jdbc:h2:mem:%s;DATABASE_TO_UPPER=false;MODE=MySQL",
 databaseName));
+        result.setUsername("root");
+        result.setPassword("root");
+        result.setMaximumPoolSize(10);
+        result.setMinimumIdle(2);
+        result.setConnectionTimeout(15L * 1000L);
+        result.setIdleTimeout(40L * 1000L);
+        return result;
+    }
+}

Reply via email to