This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a0521e56105 Improve InventoryPositionEstimatedCalculator: support
possible null unique key value (#37522)
a0521e56105 is described below
commit a0521e56105a224d8429771e0136ec5442ca8957
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Dec 26 09:27:55 2025 +0800
Improve InventoryPositionEstimatedCalculator: support possible null unique
key value (#37522)
* Refactor InventoryPositionEstimatedCalculatorTest
* Replace Range to QueryRange
* Compatible with null unique key value
* Update RELEASE-NOTES.md
* Update InventoryTaskSplitterTest.assertSplitWithEmptyTable
---
RELEASE-NOTES.md | 1 +
.../ingest/dumper/inventory/query/QueryRange.java | 11 ++++++
.../position/InventoryPositionCalculator.java | 4 +-
.../InventoryPositionEstimatedCalculator.java | 17 +++++----
.../InventoryPositionEstimatedCalculatorTest.java | 44 +++++++++++-----------
.../splitter/InventoryTaskSplitterTest.java | 5 ++-
6 files changed, 49 insertions(+), 33 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index bbd3f95e7f4..9da11c887cc 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -67,6 +67,7 @@
1. Pipeline: Improve "alter transmission rule": verify STREAM_CHANNEL TYPE
NAME - [#36864](https://github.com/apache/shardingsphere/pull/36864)
1. Pipeline: InventoryDumperContextSplitter supports multi-columns unique key
first integer column splitting -
[#36935](https://github.com/apache/shardingsphere/pull/36935)
1. Pipeline: Support unique key first integer column exact splitting -
[#37517](https://github.com/apache/shardingsphere/pull/37517)
+1. Pipeline: Improve InventoryPositionEstimatedCalculator: support possible
null unique key value -
[#37522](https://github.com/apache/shardingsphere/pull/37522)
1. Encrypt: Support handling show create view result decoration in encrypt -
[#37299](https://github.com/apache/shardingsphere/pull/37299)
1. JDBC: Enhance ResultSetUtils to support flexible string date/time
conversions - [37424](https://github.com/apache/shardingsphere/pull/37424)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/QueryRange.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/QueryRange.java
index 87c585b7d7d..5f2469fdfa0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/QueryRange.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/QueryRange.java
@@ -32,4 +32,15 @@ public final class QueryRange {
private final boolean lowerInclusive;
private final Object upper;
+
+ /**
+ * Create closed query range.
+ *
+ * @param lower lower value
+ * @param upper upper value
+ * @return query range
+ */
+ public static QueryRange closed(final Object lower, final Object upper) {
+ return new QueryRange(lower, true, upper);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
index 664842a6e33..3a64f704d4e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -18,8 +18,8 @@
package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position;
import lombok.RequiredArgsConstructor;
-import org.apache.commons.lang3.Range;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
@@ -59,7 +59,7 @@ public final class InventoryPositionCalculator {
int firstColumnDataType = uniqueKeyColumns.get(0).getDataType();
if (dataTypeOption.isIntegerDataType(firstColumnDataType)) {
String uniqueKey = uniqueKeyColumns.get(0).getName();
- Range<Long> uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
schemaName, tableName, uniqueKey);
+ QueryRange uniqueKeyValuesRange =
InventoryPositionEstimatedCalculator.getIntegerUniqueKeyValuesRange(sourceDataSource,
schemaName, tableName, uniqueKey);
return
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
uniqueKeyValuesRange, shardingSize);
}
if (1 == uniqueKeyColumns.size() &&
dataTypeOption.isStringDataType(firstColumnDataType)) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 322711990cd..9216a35a9f1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -22,6 +22,7 @@ import lombok.NoArgsConstructor;
import org.apache.commons.lang3.Range;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
@@ -52,7 +53,7 @@ public final class InventoryPositionEstimatedCalculator {
* @return unique key values range
* @throws SplitPipelineJobByUniqueKeyException if an error occurs while
getting unique key values range
*/
- public static Range<Long> getIntegerUniqueKeyValuesRange(final
PipelineDataSource dataSource, final String schemaName, final String tableName,
final String uniqueKey) {
+ public static QueryRange getIntegerUniqueKeyValuesRange(final
PipelineDataSource dataSource, final String schemaName, final String tableName,
final String uniqueKey) {
PipelinePrepareSQLBuilder pipelineSQLBuilder = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType());
String sql =
pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(schemaName, tableName,
uniqueKey);
try (
@@ -60,7 +61,7 @@ public final class InventoryPositionEstimatedCalculator {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
resultSet.next();
- return Range.of(resultSet.getLong(1), resultSet.getLong(2));
+ return QueryRange.closed(resultSet.getLong(1),
resultSet.getLong(2));
} catch (final SQLException ex) {
throw new SplitPipelineJobByUniqueKeyException(tableName,
uniqueKey, ex);
}
@@ -74,14 +75,16 @@ public final class InventoryPositionEstimatedCalculator {
* @param shardingSize sharding size
* @return positions
*/
- public static List<IngestPosition> getIntegerPositions(final long
tableRecordsCount, final Range<Long> uniqueKeyValuesRange, final long
shardingSize) {
- if (0 == tableRecordsCount) {
- return Collections.singletonList(new
IntegerPrimaryKeyIngestPosition(0L, 0L));
+ public static List<IngestPosition> getIntegerPositions(final long
tableRecordsCount, final QueryRange uniqueKeyValuesRange, final long
shardingSize) {
+ Long minimum = (Long) uniqueKeyValuesRange.getLower();
+ Long maximum = (Long) uniqueKeyValuesRange.getUpper();
+ if (0 == tableRecordsCount || null == minimum || null == maximum) {
+ return Collections.singletonList(new
IntegerPrimaryKeyIngestPosition(null, null));
}
List<IngestPosition> result = new LinkedList<>();
long splitCount = tableRecordsCount / shardingSize +
(tableRecordsCount % shardingSize > 0 ? 1 : 0);
- long interval =
BigInteger.valueOf(uniqueKeyValuesRange.getMaximum()).subtract(BigInteger.valueOf(uniqueKeyValuesRange.getMinimum())).divide(BigInteger.valueOf(splitCount)).longValue();
- IntervalToRangeIterator rangeIterator = new
IntervalToRangeIterator(uniqueKeyValuesRange.getMinimum(),
uniqueKeyValuesRange.getMaximum(), interval);
+ long interval =
BigInteger.valueOf(maximum).subtract(BigInteger.valueOf(minimum)).divide(BigInteger.valueOf(splitCount)).longValue();
+ IntervalToRangeIterator rangeIterator = new
IntervalToRangeIterator(minimum, maximum, interval);
while (rangeIterator.hasNext()) {
Range<Long> range = rangeIterator.next();
result.add(new IntegerPrimaryKeyIngestPosition(range.getMinimum(),
range.getMaximum()));
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
index 2940eace7f0..c1c6feed291 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
@@ -17,7 +17,7 @@
package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated;
-import org.apache.commons.lang3.Range;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import org.junit.jupiter.api.Test;
@@ -32,34 +32,37 @@ class InventoryPositionEstimatedCalculatorTest {
@Test
void assertGetIntegerPositions() {
- List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, Range.of(1L,
600L), 100L);
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
QueryRange.closed(1L, 600L), 100L);
assertThat(actualPositions.size(), is(2));
- for (IngestPosition each : actualPositions) {
- assertThat(each, isA(IntegerPrimaryKeyIngestPosition.class));
- }
- assertPosition(new IntegerPrimaryKeyIngestPosition(1L, 300L),
(IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
- assertPosition(new IntegerPrimaryKeyIngestPosition(301L, 600L),
(IntegerPrimaryKeyIngestPosition) actualPositions.get(1));
+ assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(1L, 300L));
+ assertPosition(actualPositions.get(1), new
IntegerPrimaryKeyIngestPosition(301L, 600L));
}
- private void assertPosition(final IntegerPrimaryKeyIngestPosition
expected, final IntegerPrimaryKeyIngestPosition actual) {
- assertThat(actual.getBeginValue(), is(expected.getBeginValue()));
- assertThat(actual.getEndValue(), is(expected.getEndValue()));
+ private void assertPosition(final IngestPosition actual, final
IntegerPrimaryKeyIngestPosition expected) {
+ assertThat(actual, isA(IntegerPrimaryKeyIngestPosition.class));
+ assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(),
is(expected.getBeginValue()));
+ assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(),
is(expected.getEndValue()));
}
@Test
void assertGetIntegerPositionsWithZeroTotalRecordsCount() {
- List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(0L, Range.of(0L, 0L),
1L);
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(0L,
QueryRange.closed(0L, 1L), 1L);
assertThat(actualPositions.size(), is(1));
- assertThat(actualPositions.get(0),
isA(IntegerPrimaryKeyIngestPosition.class));
- assertPosition(new IntegerPrimaryKeyIngestPosition(0L, 0L),
(IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
+ assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(null, null));
+ }
+
+ @Test
+ void assertGetIntegerPositionsWithNullValue() {
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
QueryRange.closed(null, null), 1L);
+ assertThat(actualPositions.size(), is(1));
+ assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(null, null));
}
@Test
void assertGetIntegerPositionsWithTheSameMinMax() {
- List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, Range.of(5L,
5L), 100L);
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
QueryRange.closed(5L, 5L), 100L);
assertThat(actualPositions.size(), is(1));
- assertThat(actualPositions.get(0),
isA(IntegerPrimaryKeyIngestPosition.class));
- assertPosition(new IntegerPrimaryKeyIngestPosition(5L, 5L),
(IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
+ assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(5L, 5L));
}
@Test
@@ -68,12 +71,9 @@ class InventoryPositionEstimatedCalculatorTest {
long shardingSize = tableRecordsCount / 2L;
long minimum = Long.MIN_VALUE + 1L;
long maximum = Long.MAX_VALUE;
- List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
Range.of(minimum, maximum), shardingSize);
+ List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
QueryRange.closed(minimum, maximum), shardingSize);
assertThat(actualPositions.size(), is(2));
- for (IngestPosition each : actualPositions) {
- assertThat(each, isA(IntegerPrimaryKeyIngestPosition.class));
- }
- assertPosition(new IntegerPrimaryKeyIngestPosition(minimum, 0L),
(IntegerPrimaryKeyIngestPosition) actualPositions.get(0));
- assertPosition(new IntegerPrimaryKeyIngestPosition(1L, maximum),
(IntegerPrimaryKeyIngestPosition) actualPositions.get(1));
+ assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(minimum, 0L));
+ assertPosition(actualPositions.get(1), new
IntegerPrimaryKeyIngestPosition(1L, maximum));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
index 36def006535..6df164000d5 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
@@ -45,6 +45,7 @@ import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class InventoryTaskSplitterTest {
@@ -88,8 +89,8 @@ class InventoryTaskSplitterTest {
List<InventoryTask> actual =
inventoryTaskSplitter.split(jobItemContext);
assertThat(actual.size(), is(1));
InventoryTask task = actual.get(0);
- assertThat(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getBeginValue(), is(0L));
- assertThat(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getEndValue(), is(0L));
+ assertNull(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getBeginValue());
+ assertNull(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getEndValue());
}
@Test