This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8d8e1440f30 Support unique key first integer column exact splitting
(#37517)
8d8e1440f30 is described below
commit 8d8e1440f301be7c42a37b2234beb12260d789e5
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 25 19:19:04 2025 +0800
Support unique key first integer column exact splitting (#37517)
* Add IntegerPositionHandler
* Extract DataSourceTestUtils
* Add InventoryIntegerPositionExactCalculatorTest
* Update RELEASE-NOTES.md
---
RELEASE-NOTES.md | 1 +
.../position/exact/IntegerPositionHandler.java | 45 ++++++++
.../exact/InventoryPositionExactCalculator.java | 10 +-
.../RecordTableInventoryCheckCalculatorTest.java | 19 +---
...nventoryIntegerPositionExactCalculatorTest.java | 113 +++++++++++++++++++++
.../pipeline/core/util/DataSourceTestUtils.java | 44 ++++++++
6 files changed, 212 insertions(+), 20 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 50f97b80f92..bbd3f95e7f4 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -66,6 +66,7 @@
1. Pipeline: InventoryDumper reuse table inventory calculator for better
function and performance -
[#36830](https://github.com/apache/shardingsphere/pull/36830)
1. Pipeline: Improve "alter transmission rule": verify STREAM_CHANNEL TYPE
NAME - [#36864](https://github.com/apache/shardingsphere/pull/36864)
1. Pipeline: InventoryDumperContextSplitter supports multi-columns unique key
first integer column splitting -
[#36935](https://github.com/apache/shardingsphere/pull/36935)
+1. Pipeline: Support unique key first integer column exact splitting -
[#37517](https://github.com/apache/shardingsphere/pull/37517)
1. Encrypt: Support handling show create view result decoration in encrypt -
[#37299](https://github.com/apache/shardingsphere/pull/37299)
1. JDBC: Enhance ResultSetUtils to support flexible string date/time
conversions - [37424](https://github.com/apache/shardingsphere/pull/37424)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
new file mode 100644
index 00000000000..787d2cf6bd8
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Integer position handler.
+ */
+public final class IntegerPositionHandler implements
DataTypePositionHandler<Long> {
+
+ @Override
+ public Long readColumnValue(final ResultSet resultSet, final int
columnIndex) throws SQLException {
+ return resultSet.getLong(columnIndex);
+ }
+
+ @Override
+ public void setPreparedStatementValue(final PreparedStatement
preparedStatement, final int parameterIndex, final Long value) throws
SQLException {
+ preparedStatement.setLong(parameterIndex, value);
+ }
+
+ @Override
+ public IntegerPrimaryKeyIngestPosition createIngestPosition(final Long
lowerValue, final Long upperValue) {
+ return new IntegerPrimaryKeyIngestPosition(lowerValue, upperValue);
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
index b392abd73e1..0f8712ef837 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -76,12 +76,12 @@ public final class InventoryPositionExactCalculator {
return positionHandler.createIngestPosition(null, null);
}
long count = resultSet.getLong(2);
- log.info("First records count is 0, return. First query SQL:
{}", firstQuerySQL);
+ T minValue = positionHandler.readColumnValue(resultSet, 3);
+ T maxValue = positionHandler.readColumnValue(resultSet, 1);
+ log.info("First records count: {}, min value: {}, max value:
{}, sharding size: {}, first query SQL: {}", count, minValue, maxValue,
shardingSize, firstQuerySQL);
if (0 == count) {
return positionHandler.createIngestPosition(null, null);
}
- T minValue = positionHandler.readColumnValue(resultSet, 3);
- T maxValue = positionHandler.readColumnValue(resultSet, 1);
return positionHandler.createIngestPosition(minValue,
maxValue);
}
} catch (final SQLException ex) {
@@ -107,14 +107,14 @@ public final class InventoryPositionExactCalculator {
if (!resultSet.next()) {
break;
}
- T maxValue = positionHandler.readColumnValue(resultSet, 1);
int count = resultSet.getInt(2);
if (0 == count) {
- log.info("Done. Later records count: {}, later query
SQL: {}, last lower value: {}", recordsCount, laterQuerySQL, lowerValue);
+ log.info("Done. Later records count: {}, last lower
value: {}, sharding size: {}, later query SQL: {}", recordsCount, lowerValue,
shardingSize, laterQuerySQL);
break;
}
recordsCount += count;
T minValue = positionHandler.readColumnValue(resultSet, 3);
+ T maxValue = positionHandler.readColumnValue(resultSet, 1);
result.add(positionHandler.createIngestPosition(minValue,
maxValue));
lowerValue = maxValue;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
index 27b689b53d2..847ac735a95 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
@@ -17,8 +17,7 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
-import com.zaxxer.hikari.HikariDataSource;
-import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.text.RandomStringGenerator;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordTableInventoryCheckCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
@@ -27,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.quer
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.core.util.DataSourceTestUtils;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -58,7 +58,8 @@ class RecordTableInventoryCheckCalculatorTest {
@BeforeAll
static void setUp() throws Exception {
- dataSource = new PipelineDataSource(createHikariDataSource("calc_" +
RandomStringUtils.randomAlphanumeric(9)),
TypedSPILoader.getService(DatabaseType.class, "H2"));
+ String databaseName = "check_calc_" +
RandomStringGenerator.builder().withinRange('a', 'z').build().generate(9);
+ dataSource = new
PipelineDataSource(DataSourceTestUtils.createHikariDataSource(databaseName),
TypedSPILoader.getService(DatabaseType.class, "H2"));
createTableAndInitData(dataSource);
}
@@ -67,18 +68,6 @@ class RecordTableInventoryCheckCalculatorTest {
dataSource.close();
}
- private static HikariDataSource createHikariDataSource(final String
databaseName) {
- HikariDataSource result = new HikariDataSource();
-
result.setJdbcUrl(String.format("jdbc:h2:mem:%s;DATABASE_TO_UPPER=false;MODE=MySQL",
databaseName));
- result.setUsername("root");
- result.setPassword("root");
- result.setMaximumPoolSize(10);
- result.setMinimumIdle(2);
- result.setConnectionTimeout(15L * 1000L);
- result.setIdleTimeout(40L * 1000L);
- return result;
- }
-
private static void createTableAndInitData(final PipelineDataSource
dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
String sql = "CREATE TABLE t_order (user_id INT NOT NULL, order_id
INT, status VARCHAR(12), PRIMARY KEY (user_id, order_id))";
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
new file mode 100644
index 00000000000..f155cd679fe
--- /dev/null
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import org.apache.commons.text.RandomStringGenerator;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import org.apache.shardingsphere.data.pipeline.core.util.DataSourceTestUtils;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class InventoryIntegerPositionExactCalculatorTest {
+
+ private static PipelineDataSource dataSource;
+
+ @BeforeAll
+ static void setUp() throws Exception {
+ String databaseName = "pos_i_calc_" +
RandomStringGenerator.builder().withinRange('a', 'z').build().generate(9);
+ dataSource = new
PipelineDataSource(DataSourceTestUtils.createHikariDataSource(databaseName),
TypedSPILoader.getService(DatabaseType.class, "H2"));
+ createTableAndInitData(dataSource);
+ }
+
+ @AfterAll
+ static void tearDown() {
+ dataSource.close();
+ }
+
+ private static void createTableAndInitData(final PipelineDataSource
dataSource) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ String sql = "CREATE TABLE t_order (user_id INT NOT NULL, order_id
INT, status VARCHAR(12), PRIMARY KEY (user_id, order_id))";
+ connection.createStatement().execute(sql);
+ PreparedStatement preparedStatement =
connection.prepareStatement("INSERT INTO t_order (user_id, order_id, status)
VALUES (?, ?, ?)");
+ insertRecord(preparedStatement, 1, 1);
+ insertRecord(preparedStatement, 2, 2);
+ insertRecord(preparedStatement, 3, 3);
+ insertRecord(preparedStatement, 3, 4);
+ insertRecord(preparedStatement, 3, 5);
+ insertRecord(preparedStatement, 3, 6);
+ insertRecord(preparedStatement, 3, 7);
+ insertRecord(preparedStatement, 4, 8);
+ insertRecord(preparedStatement, 5, 9);
+ insertRecord(preparedStatement, 6, 10);
+ insertRecord(preparedStatement, 6, 11);
+ }
+ }
+
+ private static void insertRecord(final PreparedStatement
preparedStatement, final int userId, final int orderId) throws SQLException {
+ preparedStatement.setInt(1, userId);
+ preparedStatement.setInt(2, orderId);
+ preparedStatement.setString(3, "OK");
+ preparedStatement.executeUpdate();
+ }
+
+ @Test
+ void assertGetPositionsWithOrderIdUniqueKey() {
+ List<IngestPosition> actual =
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null,
"t_order"), "order_id", 3, dataSource, new IntegerPositionHandler());
+ assertThat(actual.size(), is(4));
+ for (IngestPosition each : actual) {
+ assertThat(each,
instanceOf(IntegerPrimaryKeyIngestPosition.class));
+ }
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(0), new
IntegerPrimaryKeyIngestPosition(1L, 3L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(1), new
IntegerPrimaryKeyIngestPosition(4L, 6L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(2), new
IntegerPrimaryKeyIngestPosition(7L, 9L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(3), new
IntegerPrimaryKeyIngestPosition(10L, 11L));
+ }
+
+ private void assertIntegerPrimaryKeyIngestPosition0(final IngestPosition
actual, final IntegerPrimaryKeyIngestPosition expected) {
+ IntegerPrimaryKeyIngestPosition position =
(IntegerPrimaryKeyIngestPosition) actual;
+ assertThat(position.getType(), is(expected.getType()));
+ assertThat(position.getBeginValue(), is(expected.getBeginValue()));
+ assertThat(position.getEndValue(), is(expected.getEndValue()));
+ }
+
+ @Test
+ void assertGetPositionsWithMultiColumnUniqueKeys() {
+ List<IngestPosition> actual =
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null,
"t_order"), "user_id", 3, dataSource, new IntegerPositionHandler());
+ assertThat(actual.size(), is(2));
+ for (IngestPosition each : actual) {
+ assertThat(each,
instanceOf(IntegerPrimaryKeyIngestPosition.class));
+ }
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(0), new
IntegerPrimaryKeyIngestPosition(1L, 3L));
+ assertIntegerPrimaryKeyIngestPosition0(actual.get(1), new
IntegerPrimaryKeyIngestPosition(4L, 6L));
+ }
+}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataSourceTestUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataSourceTestUtils.java
new file mode 100644
index 00000000000..eb57788e998
--- /dev/null
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/DataSourceTestUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class DataSourceTestUtils {
+
+ /**
+ * Create Hikari data source.
+ *
+ * @param databaseName database name
+ * @return Hikari data source
+ */
+ public static HikariDataSource createHikariDataSource(final String
databaseName) {
+ HikariDataSource result = new HikariDataSource();
+
result.setJdbcUrl(String.format("jdbc:h2:mem:%s;DATABASE_TO_UPPER=false;MODE=MySQL",
databaseName));
+ result.setUsername("root");
+ result.setPassword("root");
+ result.setMaximumPoolSize(10);
+ result.setMinimumIdle(2);
+ result.setConnectionTimeout(15L * 1000L);
+ result.setIdleTimeout(40L * 1000L);
+ return result;
+ }
+}