This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new efcc0097085 Improve InventoryPositionCalculator to support BigInteger
(#37573)
efcc0097085 is described below
commit efcc00970850d469b4016badec17e027e888ed5c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Dec 29 19:35:09 2025 +0800
Improve InventoryPositionCalculator to support BigInteger (#37573)
* Improve InventoryPositionCalculator to support BigInteger
---
.../calculator/InventoryDataSparsenessCalculator.java | 9 +++++----
.../position/InventoryPositionCalculator.java | 3 ++-
.../estimated/InventoryPositionEstimatedCalculator.java | 17 ++++++++++-------
.../InventoryDataSparsenessCalculatorTest.java | 14 ++++++++------
.../InventoryPositionEstimatedCalculatorTest.java | 17 +++++++++--------
5 files changed, 34 insertions(+), 26 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculator.java
index 0026122f621..36837b7f468 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculator.java
@@ -23,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.math.RoundingMode;
/**
@@ -43,12 +44,12 @@ public final class InventoryDataSparsenessCalculator {
* @param uniqueKeyValuesRange unique key values range
* @return true if sparse
*/
- public static boolean isIntegerUniqueKeyDataSparse(final long
tableRecordsCount, final Range<Long> uniqueKeyValuesRange) {
+ public static boolean isIntegerUniqueKeyDataSparse(final long
tableRecordsCount, final Range<BigInteger> uniqueKeyValuesRange) {
boolean result = false;
- Long lowerBound = uniqueKeyValuesRange.getLowerBound();
- Long upperBound = uniqueKeyValuesRange.getUpperBound();
+ BigInteger lowerBound = uniqueKeyValuesRange.getLowerBound();
+ BigInteger upperBound = uniqueKeyValuesRange.getUpperBound();
if (tableRecordsCount >= EXACT_SPLITTING_RECORDS_COUNT_THRESHOLD &&
null != lowerBound && null != upperBound) {
- BigDecimal multiple =
BigDecimal.valueOf(upperBound).subtract(BigDecimal.valueOf(lowerBound)).add(BigDecimal.ONE)
+ BigDecimal multiple = new BigDecimal(upperBound).subtract(new
BigDecimal(lowerBound)).add(BigDecimal.ONE)
.divide(BigDecimal.valueOf(tableRecordsCount), 2,
RoundingMode.HALF_UP);
if (multiple.compareTo(MULTIPLE_THRESHOLD) >= 0) {
log.info("Table is sparse for integer unique key, table
records count: {}, unique key values range: {}, multiple: {}",
tableRecordsCount, uniqueKeyValuesRange, multiple);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
index 42a9b781fd8..7b7ec222453 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.database.connector.core.metadata.database.metad
import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
@@ -70,7 +71,7 @@ public final class InventoryPositionCalculator {
private List<IngestPosition> getIntegerPositions() {
String uniqueKey = uniqueKeyColumns.get(0).getName();
- Range<Long> uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(qualifiedTable,
uniqueKey, dataSource);
+ Range<BigInteger> uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(qualifiedTable,
uniqueKey, dataSource);
if
(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(tableRecordsCount,
uniqueKeyValuesRange)) {
return
InventoryPositionExactCalculator.getPositions(qualifiedTable, uniqueKey,
shardingSize, dataSource, new IntegerPositionHandler());
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 901b16cffb8..0fd2663af9f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -52,7 +53,7 @@ public final class InventoryPositionEstimatedCalculator {
* @return unique key values range
* @throws SplitPipelineJobByUniqueKeyException if an error occurs while
getting unique key values range
*/
- public static Range<Long> getIntegerUniqueKeyValuesRange(final
QualifiedTable qualifiedTable, final String uniqueKey, final PipelineDataSource
dataSource) {
+ public static Range<BigInteger> getIntegerUniqueKeyValuesRange(final
QualifiedTable qualifiedTable, final String uniqueKey, final PipelineDataSource
dataSource) {
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
String sql =
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName(), uniqueKey);
try (
@@ -60,7 +61,9 @@ public final class InventoryPositionEstimatedCalculator {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
resultSet.next();
- return Range.closed(resultSet.getLong(1), resultSet.getLong(2));
+ BigDecimal lowerBound = resultSet.getBigDecimal(1);
+ BigDecimal upperBound = resultSet.getBigDecimal(2);
+ return Range.closed(null == lowerBound ? null :
lowerBound.toBigInteger(), null == upperBound ? null :
upperBound.toBigInteger());
} catch (final SQLException ex) {
throw new
SplitPipelineJobByUniqueKeyException(qualifiedTable.getTableName(), uniqueKey,
ex);
}
@@ -74,16 +77,16 @@ public final class InventoryPositionEstimatedCalculator {
* @param shardingSize sharding size
* @return positions
*/
- public static List<IngestPosition> getIntegerPositions(final long
tableRecordsCount, final Range<Long> uniqueKeyValuesRange, final long
shardingSize) {
- Long lowerBound = uniqueKeyValuesRange.getLowerBound();
- Long upperBound = uniqueKeyValuesRange.getUpperBound();
+ public static List<IngestPosition> getIntegerPositions(final long
tableRecordsCount, final Range<BigInteger> uniqueKeyValuesRange, final long
shardingSize) {
+ BigInteger lowerBound = uniqueKeyValuesRange.getLowerBound();
+ BigInteger upperBound = uniqueKeyValuesRange.getUpperBound();
if (0 == tableRecordsCount || null == lowerBound || null ==
upperBound) {
return Collections.singletonList(new
IntegerPrimaryKeyIngestPosition(null, null));
}
List<IngestPosition> result = new LinkedList<>();
long splitCount = tableRecordsCount / shardingSize +
(tableRecordsCount % shardingSize > 0 ? 1 : 0);
- BigInteger stepSize =
BigInteger.valueOf(upperBound).subtract(BigInteger.valueOf(lowerBound)).divide(BigInteger.valueOf(splitCount));
- IntegerRangeSplittingIterator rangeIterator = new
IntegerRangeSplittingIterator(BigInteger.valueOf(lowerBound),
BigInteger.valueOf(upperBound), stepSize);
+ BigInteger stepSize =
upperBound.subtract(lowerBound).divide(BigInteger.valueOf(splitCount));
+ IntegerRangeSplittingIterator rangeIterator = new
IntegerRangeSplittingIterator(lowerBound, upperBound, stepSize);
while (rangeIterator.hasNext()) {
Range<BigInteger> range = rangeIterator.next();
result.add(new
IntegerPrimaryKeyIngestPosition(range.getLowerBound().longValue(),
range.getUpperBound().longValue()));
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
index 6979188719d..b88e1fa59ec 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import org.junit.jupiter.api.Test;
+import java.math.BigInteger;
+
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,11 +29,11 @@ class InventoryDataSparsenessCalculatorTest {
@Test
void assertIsIntegerUniqueKeyDataSparse() {
-
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1L,
Range.closed(1L, 1000000L)));
-
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(null, 1000000L)));
-
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(1L, null)));
-
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(1L, 1494000L)));
-
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(1L, 1495000L)));
-
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(1L, 1500000L)));
+
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(1000000L))));
+
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(null, BigInteger.valueOf(1000000L))));
+
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(BigInteger.ONE, null)));
+
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(1494000L))));
+
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(1495000L))));
+
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(1500000L))));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
index 7fd1b129f4b..f16feb375af 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import org.junit.jupiter.api.Test;
+import java.math.BigInteger;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
@@ -32,7 +33,7 @@ class InventoryPositionEstimatedCalculatorTest {
@Test
void assertGetIntegerPositions() {
- List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, Range.closed(1L,
600L), 100L);
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(600L)), 100L);
assertThat(actualPositions.size(), is(2));
assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(1L, 300L));
assertPosition(actualPositions.get(1), new
IntegerPrimaryKeyIngestPosition(301L, 600L));
@@ -46,7 +47,7 @@ class InventoryPositionEstimatedCalculatorTest {
@Test
void assertGetIntegerPositionsWithZeroTotalRecordsCount() {
- List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(0L, Range.closed(0L,
1L), 1L);
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(0L,
Range.closed(BigInteger.ZERO, BigInteger.ONE), 1L);
assertThat(actualPositions.size(), is(1));
assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(null, null));
}
@@ -60,7 +61,7 @@ class InventoryPositionEstimatedCalculatorTest {
@Test
void assertGetIntegerPositionsWithTheSameMinMax() {
- List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, Range.closed(5L,
5L), 100L);
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
Range.closed(BigInteger.valueOf(5L), BigInteger.valueOf(5L)), 100L);
assertThat(actualPositions.size(), is(1));
assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(5L, 5L));
}
@@ -69,11 +70,11 @@ class InventoryPositionEstimatedCalculatorTest {
void assertGetIntegerPositionsOverflow() {
long tableRecordsCount = Long.MAX_VALUE - 1L;
long shardingSize = tableRecordsCount / 2L;
- long minimum = Long.MIN_VALUE + 1L;
- long maximum = Long.MAX_VALUE;
- List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
Range.closed(minimum, maximum), shardingSize);
+ BigInteger lowerBound = BigInteger.valueOf(Long.MIN_VALUE + 1L);
+ BigInteger upperBound = BigInteger.valueOf(Long.MAX_VALUE);
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
Range.closed(lowerBound, upperBound), shardingSize);
assertThat(actualPositions.size(), is(2));
- assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(minimum, 0L));
- assertPosition(actualPositions.get(1), new
IntegerPrimaryKeyIngestPosition(1L, maximum));
+ assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(lowerBound.longValue(), 0L));
+ assertPosition(actualPositions.get(1), new
IntegerPrimaryKeyIngestPosition(1L, upperBound.longValue()));
}
}