This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aa3c040b83f Replace DataTypePositionHandler.createIngestPosition param
to Range (#37592)
aa3c040b83f is described below
commit aa3c040b83f7344946d66bd3ff5cb50e5db30c2c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 31 17:28:27 2025 +0800
Replace DataTypePositionHandler.createIngestPosition param to Range (#37592)
---
.../calculator/position/exact/DataTypePositionHandler.java | 6 +++---
.../calculator/position/exact/IntegerPositionHandler.java | 4 ++--
.../position/exact/InventoryPositionExactCalculator.java | 9 +++++----
.../calculator/position/exact/StringPositionHandler.java | 4 ++--
4 files changed, 12 insertions(+), 11 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
index 040a715eae8..e39c0292138 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import java.sql.PreparedStatement;
@@ -53,9 +54,8 @@ public interface DataTypePositionHandler<T> {
/**
* Create ingest position.
*
- * @param lowerBound lower bound
- * @param upperBound upper bound
+ * @param range range
* @return ingest position
*/
- UniqueKeyIngestPosition<T> createIngestPosition(T lowerBound, T
upperBound);
+ UniqueKeyIngestPosition<T> createIngestPosition(Range<T> range);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
index 0febb10491d..1e1a865ed7c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
@@ -43,7 +43,7 @@ public final class IntegerPositionHandler implements
DataTypePositionHandler<Big
}
@Override
- public UniqueKeyIngestPosition<BigInteger> createIngestPosition(final
BigInteger lowerBound, final BigInteger upperBound) {
- return UniqueKeyIngestPosition.ofInteger(Range.closed(lowerBound,
upperBound));
+ public UniqueKeyIngestPosition<BigInteger> createIngestPosition(final
Range<BigInteger> range) {
+ return UniqueKeyIngestPosition.ofInteger(range);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
index cc53f6c86f9..8bffa7ceafb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -22,6 +22,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
@@ -73,16 +74,16 @@ public final class InventoryPositionExactCalculator {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
log.info("No any record, return. First query SQL: {}",
firstQuerySQL);
- return positionHandler.createIngestPosition(null, null);
+ return
positionHandler.createIngestPosition(Range.closed(null, null));
}
long count = resultSet.getLong(2);
T minValue = positionHandler.readColumnValue(resultSet, 3);
T maxValue = positionHandler.readColumnValue(resultSet, 1);
log.info("First records count: {}, min value: {}, max value:
{}, sharding size: {}, first query SQL: {}", count, minValue, maxValue,
shardingSize, firstQuerySQL);
if (0 == count) {
- return positionHandler.createIngestPosition(null, null);
+ return
positionHandler.createIngestPosition(Range.closed(null, null));
}
- return positionHandler.createIngestPosition(minValue,
maxValue);
+ return
positionHandler.createIngestPosition(Range.closed(minValue, maxValue));
}
} catch (final SQLException ex) {
throw new
SplitPipelineJobByUniqueKeyException(qualifiedTable.getTableName(), uniqueKey,
ex);
@@ -115,7 +116,7 @@ public final class InventoryPositionExactCalculator {
recordsCount += count;
T minValue = positionHandler.readColumnValue(resultSet, 3);
T maxValue = positionHandler.readColumnValue(resultSet, 1);
- result.add(positionHandler.createIngestPosition(minValue,
maxValue));
+
result.add(positionHandler.createIngestPosition(Range.closed(minValue,
maxValue)));
lowerBound = maxValue;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
index 43086e37d10..ac8975e2dc8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
@@ -40,7 +40,7 @@ public final class StringPositionHandler implements
DataTypePositionHandler<Stri
}
@Override
- public UniqueKeyIngestPosition<String> createIngestPosition(final String
lowerBound, final String upperBound) {
- return UniqueKeyIngestPosition.ofString(Range.closed(lowerBound,
upperBound));
+ public UniqueKeyIngestPosition<String> createIngestPosition(final
Range<String> range) {
+ return UniqueKeyIngestPosition.ofString(range);
}
}