This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a0521e56105 Improve InventoryPositionEstimatedCalculator: support 
possible null unique key value (#37522)
a0521e56105 is described below

commit a0521e56105a224d8429771e0136ec5442ca8957
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Dec 26 09:27:55 2025 +0800

    Improve InventoryPositionEstimatedCalculator: support possible null unique 
key value (#37522)
    
    * Refactor InventoryPositionEstimatedCalculatorTest
    
    * Replace Range to QueryRange
    
    * Compatible with null unique key value
    
    * Update RELEASE-NOTES.md
    
    * Update InventoryTaskSplitterTest.assertSplitWithEmptyTable
---
 RELEASE-NOTES.md                                   |  1 +
 .../ingest/dumper/inventory/query/QueryRange.java  | 11 ++++++
 .../position/InventoryPositionCalculator.java      |  4 +-
 .../InventoryPositionEstimatedCalculator.java      | 17 +++++----
 .../InventoryPositionEstimatedCalculatorTest.java  | 44 +++++++++++-----------
 .../splitter/InventoryTaskSplitterTest.java        |  5 ++-
 6 files changed, 49 insertions(+), 33 deletions(-)

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index bbd3f95e7f4..9da11c887cc 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -67,6 +67,7 @@
 1. Pipeline: Improve "alter transmission rule": verify STREAM_CHANNEL TYPE 
NAME - [#36864](https://github.com/apache/shardingsphere/pull/36864)
 1. Pipeline: InventoryDumperContextSplitter supports multi-columns unique key 
first integer column splitting - 
[#36935](https://github.com/apache/shardingsphere/pull/36935)
 1. Pipeline: Support unique key first integer column exact splitting - 
[#37517](https://github.com/apache/shardingsphere/pull/37517)
+1. Pipeline: Improve InventoryPositionEstimatedCalculator: support possible 
null unique key value - 
[#37522](https://github.com/apache/shardingsphere/pull/37522)
 1. Encrypt: Support handling show create view result decoration in encrypt - 
[#37299](https://github.com/apache/shardingsphere/pull/37299)
 1. JDBC: Enhance ResultSetUtils to support flexible string date/time 
conversions - [37424](https://github.com/apache/shardingsphere/pull/37424)
 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/QueryRange.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/QueryRange.java
index 87c585b7d7d..5f2469fdfa0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/QueryRange.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/QueryRange.java
@@ -32,4 +32,15 @@ public final class QueryRange {
     private final boolean lowerInclusive;
     
     private final Object upper;
+    
+    /**
+     * Create closed query range.
+     *
+     * @param lower lower value
+     * @param upper upper value
+     * @return query range
+     */
+    public static QueryRange closed(final Object lower, final Object upper) {
+        return new QueryRange(lower, true, upper);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
index 664842a6e33..3a64f704d4e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -18,8 +18,8 @@
 package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.commons.lang3.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
@@ -59,7 +59,7 @@ public final class InventoryPositionCalculator {
         int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
         if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
             String uniqueKey = uniqueKeyColumns.get(0).getName();
-            Range<Long> uniqueKeyValuesRange = 
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
 schemaName, tableName, uniqueKey);
+            QueryRange uniqueKeyValuesRange = 
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
 schemaName, tableName, uniqueKey);
             return 
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount, 
uniqueKeyValuesRange, shardingSize);
         }
         if (1 == uniqueKeyColumns.size() && 
dataTypeOption.isStringDataType(firstColumnDataType)) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 322711990cd..9216a35a9f1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -22,6 +22,7 @@ import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
@@ -52,7 +53,7 @@ public final class InventoryPositionEstimatedCalculator {
      * @return unique key values range
      * @throws SplitPipelineJobByUniqueKeyException if an error occurs while 
getting unique key values range
      */
-    public static Range<Long> getIntegerUniqueKeyValuesRange(final 
PipelineDataSource dataSource, final String schemaName, final String tableName, 
final String uniqueKey) {
+    public static QueryRange getIntegerUniqueKeyValuesRange(final 
PipelineDataSource dataSource, final String schemaName, final String tableName, 
final String uniqueKey) {
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
         String sql = 
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(schemaName, tableName, 
uniqueKey);
         try (
@@ -60,7 +61,7 @@ public final class InventoryPositionEstimatedCalculator {
                 Statement statement = connection.createStatement();
                 ResultSet resultSet = statement.executeQuery(sql)) {
             resultSet.next();
-            return Range.of(resultSet.getLong(1), resultSet.getLong(2));
+            return QueryRange.closed(resultSet.getLong(1), 
resultSet.getLong(2));
         } catch (final SQLException ex) {
             throw new SplitPipelineJobByUniqueKeyException(tableName, 
uniqueKey, ex);
         }
@@ -74,14 +75,16 @@ public final class InventoryPositionEstimatedCalculator {
      * @param shardingSize sharding size
      * @return positions
      */
-    public static List<IngestPosition> getIntegerPositions(final long 
tableRecordsCount, final Range<Long> uniqueKeyValuesRange, final long 
shardingSize) {
-        if (0 == tableRecordsCount) {
-            return Collections.singletonList(new 
IntegerPrimaryKeyIngestPosition(0L, 0L));
+    public static List<IngestPosition> getIntegerPositions(final long 
tableRecordsCount, final QueryRange uniqueKeyValuesRange, final long 
shardingSize) {
+        Long minimum = (Long) uniqueKeyValuesRange.getLower();
+        Long maximum = (Long) uniqueKeyValuesRange.getUpper();
+        if (0 == tableRecordsCount || null == minimum || null == maximum) {
+            return Collections.singletonList(new 
IntegerPrimaryKeyIngestPosition(null, null));
         }
         List<IngestPosition> result = new LinkedList<>();
         long splitCount = tableRecordsCount / shardingSize + 
(tableRecordsCount % shardingSize > 0 ? 1 : 0);
-        long interval = 
BigInteger.valueOf(uniqueKeyValuesRange.getMaximum()).subtract(BigInteger.valueOf(uniqueKeyValuesRange.getMinimum())).divide(BigInteger.valueOf(splitCount)).longValue();
-        IntervalToRangeIterator rangeIterator = new 
IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(), 
uniqueKeyValuesRange.getMaximum(), interval);
+        long interval = 
BigInteger.valueOf(maximum).subtract(BigInteger.valueOf(minimum)).divide(BigInteger.valueOf(splitCount)).longValue();
+        IntervalToRangeIterator rangeIterator = new 
IntervalToRangeIterator(minimum, maximum, interval);
         while (rangeIterator.hasNext()) {
             Range<Long> range = rangeIterator.next();
             result.add(new IntegerPrimaryKeyIngestPosition(range.getMinimum(), 
range.getMaximum()));
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
index 2940eace7f0..c1c6feed291 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
@@ -17,7 +17,7 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated;
 
-import org.apache.commons.lang3.Range;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
 import org.junit.jupiter.api.Test;
@@ -32,34 +32,37 @@ class InventoryPositionEstimatedCalculatorTest {
     
     @Test
     void assertGetIntegerPositions() {
-        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, Range.of(1L, 
600L), 100L);
+        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, 
QueryRange.closed(1L, 600L), 100L);
         assertThat(actualPositions.size(), is(2));
-        for (IngestPosition each : actualPositions) {
-            assertThat(each, isA(IntegerPrimaryKeyIngestPosition.class));
-        }
-        assertPosition(new IntegerPrimaryKeyIngestPosition(1L, 300L), 
(IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
-        assertPosition(new IntegerPrimaryKeyIngestPosition(301L, 600L), 
(IntegerPrimaryKeyIngestPosition) actualPositions.get(1));
+        assertPosition(actualPositions.get(0), new 
IntegerPrimaryKeyIngestPosition(1L, 300L));
+        assertPosition(actualPositions.get(1), new 
IntegerPrimaryKeyIngestPosition(301L, 600L));
     }
     
-    private void assertPosition(final IntegerPrimaryKeyIngestPosition 
expected, final IntegerPrimaryKeyIngestPosition actual) {
-        assertThat(actual.getBeginValue(), is(expected.getBeginValue()));
-        assertThat(actual.getEndValue(), is(expected.getEndValue()));
+    private void assertPosition(final IngestPosition actual, final 
IntegerPrimaryKeyIngestPosition expected) {
+        assertThat(actual, isA(IntegerPrimaryKeyIngestPosition.class));
+        assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(), 
is(expected.getBeginValue()));
+        assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(), 
is(expected.getEndValue()));
     }
     
     @Test
     void assertGetIntegerPositionsWithZeroTotalRecordsCount() {
-        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(0L, Range.of(0L, 0L), 
1L);
+        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(0L, 
QueryRange.closed(0L, 1L), 1L);
         assertThat(actualPositions.size(), is(1));
-        assertThat(actualPositions.get(0), 
isA(IntegerPrimaryKeyIngestPosition.class));
-        assertPosition(new IntegerPrimaryKeyIngestPosition(0L, 0L), 
(IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
+        assertPosition(actualPositions.get(0), new 
IntegerPrimaryKeyIngestPosition(null, null));
+    }
+    
+    @Test
+    void assertGetIntegerPositionsWithNullValue() {
+        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, 
QueryRange.closed(null, null), 1L);
+        assertThat(actualPositions.size(), is(1));
+        assertPosition(actualPositions.get(0), new 
IntegerPrimaryKeyIngestPosition(null, null));
     }
     
     @Test
     void assertGetIntegerPositionsWithTheSameMinMax() {
-        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, Range.of(5L, 
5L), 100L);
+        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, 
QueryRange.closed(5L, 5L), 100L);
         assertThat(actualPositions.size(), is(1));
-        assertThat(actualPositions.get(0), 
isA(IntegerPrimaryKeyIngestPosition.class));
-        assertPosition(new IntegerPrimaryKeyIngestPosition(5L, 5L), 
(IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
+        assertPosition(actualPositions.get(0), new 
IntegerPrimaryKeyIngestPosition(5L, 5L));
     }
     
     @Test
@@ -68,12 +71,9 @@ class InventoryPositionEstimatedCalculatorTest {
         long shardingSize = tableRecordsCount / 2L;
         long minimum = Long.MIN_VALUE + 1L;
         long maximum = Long.MAX_VALUE;
-        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount, 
Range.of(minimum, maximum), shardingSize);
+        List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount, 
QueryRange.closed(minimum, maximum), shardingSize);
         assertThat(actualPositions.size(), is(2));
-        for (IngestPosition each : actualPositions) {
-            assertThat(each, isA(IntegerPrimaryKeyIngestPosition.class));
-        }
-        assertPosition(new IntegerPrimaryKeyIngestPosition(minimum, 0L), 
(IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
-        assertPosition(new IntegerPrimaryKeyIngestPosition(1L, maximum), 
(IntegerPrimaryKeyIngestPosition) actualPositions.get(1));
+        assertPosition(actualPositions.get(0), new 
IntegerPrimaryKeyIngestPosition(minimum, 0L));
+        assertPosition(actualPositions.get(1), new 
IntegerPrimaryKeyIngestPosition(1L, maximum));
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
index 36def006535..6df164000d5 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
@@ -45,6 +45,7 @@ import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class InventoryTaskSplitterTest {
@@ -88,8 +89,8 @@ class InventoryTaskSplitterTest {
         List<InventoryTask> actual = 
inventoryTaskSplitter.split(jobItemContext);
         assertThat(actual.size(), is(1));
         InventoryTask task = actual.get(0);
-        assertThat(((IntegerPrimaryKeyIngestPosition) 
task.getTaskProgress().getPosition()).getBeginValue(), is(0L));
-        assertThat(((IntegerPrimaryKeyIngestPosition) 
task.getTaskProgress().getPosition()).getEndValue(), is(0L));
+        assertNull(((IntegerPrimaryKeyIngestPosition) 
task.getTaskProgress().getPosition()).getBeginValue());
+        assertNull(((IntegerPrimaryKeyIngestPosition) 
task.getTaskProgress().getPosition()).getEndValue());
     }
     
     @Test

Reply via email to