This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4267dd7b28d Add UniqueKeyIngestPosition to replace
PrimaryKeyIngestPosition and PrimaryKeyIngestPositionFactory (#37589)
4267dd7b28d is described below
commit 4267dd7b28d0477d62b62f435ce9c0c38cdeae43
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 31 14:01:35 2025 +0800
Add UniqueKeyIngestPosition to replace PrimaryKeyIngestPosition and
PrimaryKeyIngestPositionFactory (#37589)
* Rename to unified lowerBound and upperBound
* Add UniqueKeyIngestPosition
* Add UniqueKeyIngestPositionTest
* Replace PrimaryKeyIngestPositionFactory to UniqueKeyIngestPosition
* Replace IntegerPrimaryKeyIngestPosition to UniqueKeyIngestPosition
* Replace StringPrimaryKeyIngestPosition to UniqueKeyIngestPosition
* Replace UnsupportedPrimaryKeyIngestPosition to UniqueKeyIngestPosition
* Replace PrimaryKeyIngestPosition to UniqueKeyIngestPosition
* Add UniqueKeyIngestPosition toString()
* Update code style & UT
---
.../position/TableCheckRangePosition.java | 6 +-
.../yaml/YamlTableCheckRangePositionSwapper.java | 4 +-
.../table/MatchingTableInventoryChecker.java | 8 +-
.../ingest/dumper/inventory/InventoryDumper.java | 9 +-
...niqueKeyInventoryDataRecordPositionCreator.java | 8 +-
.../query/IntegerRangeSplittingIterator.java | 2 +-
.../position/type/pk/PrimaryKeyIngestPosition.java | 49 ------
.../type/pk/PrimaryKeyIngestPositionFactory.java | 88 ----------
.../position/type/pk/UniqueKeyIngestPosition.java | 180 +++++++++++++++++++++
.../pk/type/IntegerPrimaryKeyIngestPosition.java | 57 -------
.../pk/type/StringPrimaryKeyIngestPosition.java | 48 ------
.../type/pk/type/UnsupportedKeyIngestPosition.java | 46 ------
.../YamlJobItemInventoryTasksProgressSwapper.java | 4 +-
.../position/InventoryPositionCalculator.java | 4 +-
.../InventoryPositionEstimatedCalculator.java | 7 +-
.../position/exact/DataTypePositionHandler.java | 4 +-
.../position/exact/IntegerPositionHandler.java | 7 +-
.../exact/InventoryPositionExactCalculator.java | 12 +-
.../position/exact/StringPositionHandler.java | 8 +-
.../splitter/InventoryDumperContextSplitter.java | 4 +-
...eKeyInventoryDataRecordPositionCreatorTest.java | 10 +-
.../pk/PrimaryKeyIngestPositionFactoryTest.java | 113 -------------
.../type/pk/UniqueKeyIngestPositionTest.java | 149 +++++++++++++++++
.../type/IntegerPrimaryKeyIngestPositionTest.java | 40 -----
.../type/StringPrimaryKeyIngestPositionTest.java | 36 -----
.../pk/type/UnsupportedKeyIngestPositionTest.java | 31 ----
.../progress/TransmissionJobItemProgressTest.java | 19 ++-
.../InventoryDataSparsenessCalculatorTest.java | 16 +-
.../InventoryPositionEstimatedCalculatorTest.java | 28 ++--
.../cdc/util/DataRecordResultConvertUtilsTest.java | 5 +-
.../ConsistencyCheckJobItemContextTest.java | 16 +-
.../MigrationDataConsistencyChecker.java | 4 +-
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 4 +-
...nventoryIntegerPositionExactCalculatorTest.java | 29 ++--
...InventoryStringPositionExactCalculatorTest.java | 29 ++--
.../splitter/InventoryTaskSplitterTest.java | 16 +-
.../data/pipeline/core/task/InventoryTaskTest.java | 7 +-
.../ConsistencyCheckJobExecutorCallbackTest.java | 8 +-
38 files changed, 472 insertions(+), 643 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
index 430bc6f4e4e..983a04288b2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
@@ -22,7 +22,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
/**
* Table check range position.
@@ -40,9 +40,9 @@ public final class TableCheckRangePosition {
private final String logicTableName;
- private final PrimaryKeyIngestPosition<?> sourceRange;
+ private final UniqueKeyIngestPosition<?> sourceRange;
- private final PrimaryKeyIngestPosition<?> targetRange;
+ private final UniqueKeyIngestPosition<?> targetRange;
private final String queryCondition;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
index 1236cd785fc..2e43175d7f5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
@@ -45,7 +45,7 @@ public final class YamlTableCheckRangePositionSwapper
implements YamlConfigurati
@Override
public TableCheckRangePosition swapToObject(final
YamlTableCheckRangePosition yamlConfig) {
return new TableCheckRangePosition(yamlConfig.getSplittingItem(),
yamlConfig.getSourceDataNode(), yamlConfig.getLogicTableName(),
-
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getSourceRange()),
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getTargetRange()),
+ UniqueKeyIngestPosition.decode(yamlConfig.getSourceRange()),
UniqueKeyIngestPosition.decode(yamlConfig.getTargetRange()),
yamlConfig.getQueryCondition(),
yamlConfig.getSourcePosition(), yamlConfig.getTargetPosition(),
yamlConfig.isFinished(), yamlConfig.getMatched());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index dc856cb2dbb..d92c397660b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -77,8 +77,8 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
TableInventoryCalculateParameter sourceParam = new
TableInventoryCalculateParameter(param.getSourceDataSource(),
param.getSourceTable(),
param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY, param.getQueryCondition());
TableCheckRangePosition checkRangePosition =
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
- sourceParam.setRange(Range.closed(null !=
checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition()
: checkRangePosition.getSourceRange().getBeginValue(),
- checkRangePosition.getSourceRange().getEndValue()));
+ sourceParam.setRange(Range.closed(null !=
checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition()
: checkRangePosition.getSourceRange().getLowerBound(),
+ checkRangePosition.getSourceRange().getUpperBound()));
TableInventoryCalculateParameter targetParam =
getTableInventoryCalculateParameter(param, checkRangePosition);
TableInventoryCalculator<TableInventoryCheckCalculatedResult>
sourceCalculator = buildSingleTableInventoryCalculator();
this.sourceCalculator = sourceCalculator;
@@ -132,8 +132,8 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
private TableInventoryCalculateParameter
getTableInventoryCalculateParameter(final TableInventoryCheckParameter param,
final TableCheckRangePosition checkRangePosition) {
TableInventoryCalculateParameter result = new
TableInventoryCalculateParameter(param.getTargetDataSource(),
param.getTargetTable(),
param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY, param.getQueryCondition());
- result.setRange(Range.closed(null !=
checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition()
: checkRangePosition.getTargetRange().getBeginValue(),
- checkRangePosition.getTargetRange().getEndValue()));
+ result.setRange(Range.closed(null !=
checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition()
: checkRangePosition.getTargetRange().getLowerBound(),
+ checkRangePosition.getTargetRange().getUpperBound()));
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index dc8995ebc09..7f8d3361b2e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -27,15 +27,14 @@ import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLife
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractRecordTableInventoryCalculator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
@@ -124,7 +123,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
List<String> columnNames = dumperContext.getQueryColumnNames();
TableInventoryCalculateParameter calculateParam = new
TableInventoryCalculateParameter(dataSource, table,
columnNames, dumperContext.getUniqueKeyColumns(),
QueryType.RANGE_QUERY, null);
- Range<?> range = Range.closed(((PrimaryKeyIngestPosition<?>)
initialPosition).getBeginValue(), ((PrimaryKeyIngestPosition<?>)
initialPosition).getEndValue());
+ Range<?> range = Range.closed(((UniqueKeyIngestPosition<?>)
initialPosition).getLowerBound(), ((UniqueKeyIngestPosition<?>)
initialPosition).getUpperBound());
calculateParam.setRange(range);
RecordTableInventoryDumpCalculator dumpCalculator = new
RecordTableInventoryDumpCalculator(dumperContext.getBatchSize(),
StreamingRangeType.SMALL);
long rowCount = 0L;
@@ -132,7 +131,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
String firstUniqueKey =
calculateParam.getFirstUniqueKey().getName();
for (List<DataRecord> each :
dumpCalculator.calculate(calculateParam)) {
channel.push(Collections.unmodifiableList(each));
- IngestPosition position =
PrimaryKeyIngestPositionFactory.newInstance(dumpCalculator.getFirstUniqueKeyValue(each.get(each.size()
- 1), firstUniqueKey), range.getUpperBound());
+ IngestPosition position =
UniqueKeyIngestPosition.newInstance(Range.closed(dumpCalculator.getFirstUniqueKeyValue(each.get(each.size()
- 1), firstUniqueKey), range.getUpperBound()));
dumperContext.getCommonContext().setPosition(position);
rowCount += each.size();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
index a4d2fcf032e..3d454cde3fb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
@@ -19,9 +19,9 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.pos
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -33,7 +33,7 @@ public final class
UniqueKeyInventoryDataRecordPositionCreator implements Invent
@Override
public IngestPosition create(final InventoryDumperContext dumperContext,
final ResultSet resultSet) throws SQLException {
- return PrimaryKeyIngestPositionFactory.newInstance(
-
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()),
((PrimaryKeyIngestPosition<?>)
dumperContext.getCommonContext().getPosition()).getEndValue());
+ return
UniqueKeyIngestPosition.newInstance(Range.closed(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()),
+ ((UniqueKeyIngestPosition<?>)
dumperContext.getCommonContext().getPosition()).getUpperBound()));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/IntegerRangeSplittingIterator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/IntegerRangeSplittingIterator.java
index 51916a73bc6..91028b5657b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/IntegerRangeSplittingIterator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/IntegerRangeSplittingIterator.java
@@ -36,7 +36,7 @@ public final class IntegerRangeSplittingIterator implements
Iterator<Range<BigIn
public IntegerRangeSplittingIterator(final BigInteger lowerBound, final
BigInteger upperBound, final BigInteger stepSize) {
if (lowerBound.compareTo(upperBound) > 0) {
- throw new IllegalArgumentException("lower bounder greater than
upper bound");
+ throw new IllegalArgumentException("lower bound greater than upper
bound");
}
if (stepSize.compareTo(BigInteger.ZERO) < 0) {
throw new IllegalArgumentException("step size is less than zero");
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPosition.java
deleted file mode 100644
index 1a07b3fdf61..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPosition.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
-
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-
-/**
- * Primary key ingest position.
- *
- * @param <T> type of value
- */
-public interface PrimaryKeyIngestPosition<T> extends IngestPosition {
-
- /**
- * Get begin value.
- *
- * @return begin value
- */
- T getBeginValue();
-
- /**
- * Get end value.
- *
- * @return end value
- */
- T getEndValue();
-
- /**
- * Get type.
- *
- * @return type
- */
- char getType();
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
deleted file mode 100644
index c2ba6deabdd..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
-
-import java.math.BigInteger;
-import java.util.List;
-
-/**
- * Primary key ingest position factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PrimaryKeyIngestPositionFactory {
-
- /**
- * Create new instance by string data.
- *
- * @param data string data
- * @return primary key position
- * @throws IllegalArgumentException illegal argument exception
- */
- public static PrimaryKeyIngestPosition<?> newInstance(final String data) {
- List<String> parts = Splitter.on(',').splitToList(data);
- Preconditions.checkArgument(3 == parts.size(), "Unknown primary key
position: " + data);
- Preconditions.checkArgument(1 == parts.get(0).length(), "Invalid
primary key position type: " + parts.get(0));
- char type = parts.get(0).charAt(0);
- String beginValue = parts.get(1);
- String endValue = parts.get(2);
- switch (type) {
- case 'i':
- return new
IntegerPrimaryKeyIngestPosition(Strings.isNullOrEmpty(beginValue) ? null : new
BigInteger(beginValue), Strings.isNullOrEmpty(endValue) ? null : new
BigInteger(endValue));
- case 's':
- return new StringPrimaryKeyIngestPosition(beginValue,
endValue);
- case 'u':
- return new UnsupportedKeyIngestPosition();
- default:
- throw new IllegalArgumentException("Unknown primary key
position type: " + type);
- }
- }
-
- /**
- * New instance by begin value and end value.
- *
- * @param beginValue begin value
- * @param endValue end value
- * @return ingest position
- */
- public static PrimaryKeyIngestPosition<?> newInstance(final Object
beginValue, final Object endValue) {
- if (beginValue instanceof Number) {
- return new
IntegerPrimaryKeyIngestPosition(convertToBigInteger((Number) beginValue), null
== endValue ? null : convertToBigInteger((Number) endValue));
- }
- if (beginValue instanceof CharSequence) {
- return new StringPrimaryKeyIngestPosition(beginValue.toString(),
null == endValue ? null : endValue.toString());
- }
- // TODO support more types, e.g. byte[] (MySQL varbinary)
- return new UnsupportedKeyIngestPosition();
- }
-
- private static BigInteger convertToBigInteger(final Number number) {
- if (number instanceof BigInteger) {
- return (BigInteger) number;
- }
- return BigInteger.valueOf(number.longValue());
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
new file mode 100644
index 00000000000..453146d8d9a
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+
+import java.math.BigInteger;
+import java.util.List;
+
+/**
+ * Unique key ingest position.
+ *
+ * @param <T> type of value
+ */
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public final class UniqueKeyIngestPosition<T> implements IngestPosition {
+
+ @Getter
+ private final char type;
+
+ private final Range<T> range;
+
+ /**
+ * New instance by lower bound and upper bound.
+ *
+ * @param <T> type of value
+ * @param range range
+ * @return ingest position
+ */
+ public static <T> UniqueKeyIngestPosition<?> newInstance(final Range<T>
range) {
+ T lowerBound = range.getLowerBound();
+ T upperBound = range.getUpperBound();
+ if (lowerBound instanceof Number) {
+ BigInteger lower = convertToBigInteger((Number) lowerBound);
+ BigInteger upper = null == upperBound ? null :
convertToBigInteger((Number) upperBound);
+ return ofInteger(range.isLowerInclusive() ? Range.closed(lower,
upper) : Range.openClosed(lower, upper));
+ }
+ if (lowerBound instanceof CharSequence) {
+ String lower = lowerBound.toString();
+ String upper = null == upperBound ? null : upperBound.toString();
+ return ofString(range.isLowerInclusive() ? Range.closed(lower,
upper) : Range.openClosed(lower, upper));
+ }
+ // TODO support more types, e.g. byte[] (MySQL varbinary)
+ return ofUnsplit();
+ }
+
+ private static BigInteger convertToBigInteger(final Number number) {
+ if (number instanceof BigInteger) {
+ return (BigInteger) number;
+ }
+ return BigInteger.valueOf(number.longValue());
+ }
+
+ /**
+ * Create integer unique key ingest position.
+ *
+ * @param range range
+ * @return integer unique key ingest position
+ */
+ public static UniqueKeyIngestPosition<BigInteger> ofInteger(final
Range<BigInteger> range) {
+ return new UniqueKeyIngestPosition<>('i', range);
+ }
+
+ /**
+ * Create string unique key ingest position.
+ *
+ * @param range range
+ * @return string unique key ingest position
+ */
+ public static UniqueKeyIngestPosition<String> ofString(final Range<String>
range) {
+ return new UniqueKeyIngestPosition<>('s', range);
+ }
+
+ /**
+ * Create unsplit unique key ingest position.
+ *
+ * @return unsplit unique key ingest position
+ */
+ public static UniqueKeyIngestPosition<Void> ofUnsplit() {
+ return new UniqueKeyIngestPosition<>('u', Range.closed(null, null));
+ }
+
+ /**
+ * Create new instance by text.
+ *
+ * @param text text
+ * @return unique key position
+ * @throws IllegalArgumentException illegal argument exception
+ */
+ public static UniqueKeyIngestPosition<?> decode(final String text) {
+ List<String> parts = Splitter.on(',').splitToList(text);
+ Preconditions.checkArgument(3 == parts.size(), "Unknown unique key
position: " + text);
+ Preconditions.checkArgument(1 == parts.get(0).length(), "Invalid
unique key position type: " + parts.get(0));
+ char type = parts.get(0).charAt(0);
+ String lowerBound = parts.get(1);
+ String upperBound = parts.get(2);
+ switch (type) {
+ case 'i':
+ BigInteger lower = Strings.isNullOrEmpty(lowerBound) ? null :
new BigInteger(lowerBound);
+ BigInteger upper = Strings.isNullOrEmpty(upperBound) ? null :
new BigInteger(upperBound);
+ return ofInteger(Range.closed(lower, upper));
+ case 's':
+ return ofString(Range.closed(lowerBound, upperBound));
+ case 'u':
+ return ofUnsplit();
+ default:
+ throw new IllegalArgumentException("Unknown unique key
position type: " + type);
+ }
+ }
+
+ /**
+ * Encode to text.
+ *
+ * @return encoded text
+ * @throws RuntimeException runtime exception
+ */
+ public String encode() {
+ T lowerBound = getLowerBound();
+ T upperBound = getUpperBound();
+ String encodedLowerBound;
+ String encodedUpperBound;
+ switch (getType()) {
+ case 'i':
+ case 's':
+ case 'u':
+ encodedLowerBound = null == lowerBound ? "" :
lowerBound.toString();
+ encodedUpperBound = null == upperBound ? "" :
upperBound.toString();
+ break;
+ default:
+ throw new RuntimeException("Unknown unique key position type:
" + getType());
+ }
+ return String.format("%s,%s,%s", getType(), encodedLowerBound,
encodedUpperBound);
+ }
+
+ /**
+ * Get lower bound.
+ *
+ * @return lower bound
+ */
+ public T getLowerBound() {
+ return range.getLowerBound();
+ }
+
+ /**
+ * Get upper bound.
+ *
+ * @return upper bound
+ */
+ public T getUpperBound() {
+ return range.getUpperBound();
+ }
+
+ @Override
+ public String toString() {
+ // TODO Add encode() method in IngestPosition interface, and remove
.toString() invocations.
+ return encode();
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
deleted file mode 100644
index c82121c2025..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-
-import java.math.BigInteger;
-
-/**
- * Integer primary key ingest position.
- */
-public final class IntegerPrimaryKeyIngestPosition implements
PrimaryKeyIngestPosition<BigInteger> {
-
- private final BigInteger beginValue;
-
- private final BigInteger endValue;
-
- public IntegerPrimaryKeyIngestPosition(final BigInteger beginValue, final
BigInteger endValue) {
- this.beginValue = beginValue;
- this.endValue = endValue;
- }
-
- @Override
- public BigInteger getBeginValue() {
- return beginValue;
- }
-
- @Override
- public BigInteger getEndValue() {
- return endValue;
- }
-
- @Override
- public char getType() {
- return 'i';
- }
-
- @Override
- public String toString() {
- return String.format("%s,%s,%s", getType(), null == beginValue ? "" :
beginValue, null == endValue ? "" : endValue);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPosition.java
deleted file mode 100644
index 96cf79ae708..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPosition.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import com.google.common.base.Strings;
-import lombok.Getter;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-
-/**
- * String primary key ingest position.
- */
-@Getter
-public final class StringPrimaryKeyIngestPosition implements
PrimaryKeyIngestPosition<String> {
-
- private final String beginValue;
-
- private final String endValue;
-
- public StringPrimaryKeyIngestPosition(final String beginValue, final
String endValue) {
- this.beginValue = Strings.emptyToNull(beginValue);
- this.endValue = Strings.emptyToNull(endValue);
- }
-
- @Override
- public char getType() {
- return 's';
- }
-
- @Override
- public String toString() {
- return String.format("%s,%s,%s", getType(), null == beginValue ? "" :
beginValue, null == endValue ? "" : endValue);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPosition.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPosition.java
deleted file mode 100644
index 8406c07518d..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPosition.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-
-/**
- * Unsupported key ingest position.
- */
-public final class UnsupportedKeyIngestPosition implements
PrimaryKeyIngestPosition<Void> {
-
- @Override
- public Void getBeginValue() {
- return null;
- }
-
- @Override
- public Void getEndValue() {
- return null;
- }
-
- @Override
- public char getType() {
- return 'u';
- }
-
- @Override
- public String toString() {
- return String.format("%s,,", getType());
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemInventoryTasksProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemInventoryTasksProgressSwapper.java
index e3c464b8f66..53accc9d52f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemInventoryTasksProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemInventoryTasksProgressSwapper.java
@@ -19,8 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper;
import com.google.common.base.Strings;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemInventoryTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlJobItemInventoryTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
@@ -83,6 +83,6 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
private Function<Entry<String, String>, InventoryTaskProgress>
getInventoryTaskProgressFunction() {
return entry -> new InventoryTaskProgress(
- Strings.isNullOrEmpty(entry.getValue()) ? new
IngestPlaceholderPosition() :
PrimaryKeyIngestPositionFactory.newInstance(entry.getValue()));
+ Strings.isNullOrEmpty(entry.getValue()) ? new
IngestPlaceholderPosition() : UniqueKeyIngestPosition.decode(entry.getValue()));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
index 17ca03b6b5d..baf3743c797 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryDataSparsenessCalculator;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
@@ -66,7 +66,7 @@ public final class InventoryPositionCalculator {
if (dataTypeOption.isStringDataType(firstColumnDataType)) {
return getStringPositions();
}
- return Collections.singletonList(new UnsupportedKeyIngestPosition());
+ return Collections.singletonList(UniqueKeyIngestPosition.ofUnsplit());
}
private List<IngestPosition> getIntegerPositions() {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 7889168491e..c35e231b6da 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJ
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.IntegerRangeSplittingIterator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -81,15 +81,14 @@ public final class InventoryPositionEstimatedCalculator {
BigInteger lowerBound = uniqueKeyValuesRange.getLowerBound();
BigInteger upperBound = uniqueKeyValuesRange.getUpperBound();
if (0 == tableRecordsCount || null == lowerBound || null ==
upperBound) {
- return Collections.singletonList(new
IntegerPrimaryKeyIngestPosition(null, null));
+ return
Collections.singletonList(UniqueKeyIngestPosition.ofInteger(Range.closed(null,
null)));
}
List<IngestPosition> result = new LinkedList<>();
long splitCount = tableRecordsCount / shardingSize +
(tableRecordsCount % shardingSize > 0 ? 1 : 0);
BigInteger stepSize =
upperBound.subtract(lowerBound).divide(BigInteger.valueOf(splitCount));
IntegerRangeSplittingIterator rangeIterator = new
IntegerRangeSplittingIterator(lowerBound, upperBound, stepSize);
while (rangeIterator.hasNext()) {
- Range<BigInteger> range = rangeIterator.next();
- result.add(new
IntegerPrimaryKeyIngestPosition(range.getLowerBound(), range.getUpperBound()));
+
result.add(UniqueKeyIngestPosition.ofInteger(rangeIterator.next()));
}
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
index 591c201e64b..040a715eae8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
@@ -17,7 +17,7 @@
package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -57,5 +57,5 @@ public interface DataTypePositionHandler<T> {
* @param upperBound upper bound
* @return ingest position
*/
- PrimaryKeyIngestPosition<T> createIngestPosition(T lowerBound, T
upperBound);
+ UniqueKeyIngestPosition<T> createIngestPosition(T lowerBound, T
upperBound);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
index f401132e436..0febb10491d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
@@ -17,7 +17,8 @@
package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -42,7 +43,7 @@ public final class IntegerPositionHandler implements
DataTypePositionHandler<Big
}
@Override
- public IntegerPrimaryKeyIngestPosition createIngestPosition(final
BigInteger lowerBound, final BigInteger upperBound) {
- return new IntegerPrimaryKeyIngestPosition(lowerBound, upperBound);
+ public UniqueKeyIngestPosition<BigInteger> createIngestPosition(final
BigInteger lowerBound, final BigInteger upperBound) {
+ return UniqueKeyIngestPosition.ofInteger(Range.closed(lowerBound,
upperBound));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
index 6d4440e677b..cc53f6c86f9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -56,14 +56,14 @@ public final class InventoryPositionExactCalculator {
public static <T> List<IngestPosition> getPositions(final QualifiedTable
qualifiedTable, final String uniqueKey, final int shardingSize,
final
PipelineDataSource dataSource, final DataTypePositionHandler<T>
positionHandler) {
List<IngestPosition> result = new LinkedList<>();
- PrimaryKeyIngestPosition<T> firstPosition =
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, dataSource,
positionHandler);
+ UniqueKeyIngestPosition<T> firstPosition =
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, dataSource,
positionHandler);
result.add(firstPosition);
result.addAll(getLeftPositions(qualifiedTable, uniqueKey,
shardingSize, firstPosition, dataSource, positionHandler));
return result;
}
- private static <T> PrimaryKeyIngestPosition<T> getFirstPosition(final
QualifiedTable qualifiedTable, final String uniqueKey, final int shardingSize,
- final
PipelineDataSource dataSource, final DataTypePositionHandler<T>
positionHandler) {
+ private static <T> UniqueKeyIngestPosition<T> getFirstPosition(final
QualifiedTable qualifiedTable, final String uniqueKey, final int shardingSize,
+ final
PipelineDataSource dataSource, final DataTypePositionHandler<T>
positionHandler) {
String firstQuerySQL = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType())
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName(), uniqueKey, false);
try (
@@ -90,10 +90,10 @@ public final class InventoryPositionExactCalculator {
}
private static <T> List<IngestPosition> getLeftPositions(final
QualifiedTable qualifiedTable, final String uniqueKey,
- final int
shardingSize, final PrimaryKeyIngestPosition<T> firstPosition,
+ final int
shardingSize, final UniqueKeyIngestPosition<T> firstPosition,
final
PipelineDataSource dataSource, final DataTypePositionHandler<T>
positionHandler) {
List<IngestPosition> result = new LinkedList<>();
- T lowerBound = firstPosition.getEndValue();
+ T lowerBound = firstPosition.getUpperBound();
long recordsCount = 0;
String laterQuerySQL = new
PipelinePrepareSQLBuilder(dataSource.getDatabaseType())
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName(), uniqueKey, true);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
index 331503f7c24..43086e37d10 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
@@ -17,8 +17,8 @@
package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -40,7 +40,7 @@ public final class StringPositionHandler implements
DataTypePositionHandler<Stri
}
@Override
- public PrimaryKeyIngestPosition<String> createIngestPosition(final String
lowerBound, final String upperBound) {
- return new StringPrimaryKeyIngestPosition(lowerBound, upperBound);
+ public UniqueKeyIngestPosition<String> createIngestPosition(final String
lowerBound, final String upperBound) {
+ return UniqueKeyIngestPosition.ofString(Range.closed(lowerBound,
upperBound));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 34fae406389..4ef23e9e6df 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessC
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
@@ -107,7 +107,7 @@ public final class InventoryDumperContextSplitter {
long tableRecordsCount =
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
if (!dumperContext.hasUniqueKey()) {
- return Collections.singleton(new UnsupportedKeyIngestPosition());
+ return Collections.singleton(UniqueKeyIngestPosition.ofUnsplit());
}
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
int shardingSize =
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreatorTest.java
index eb3a7753c17..9a0298edeae 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreatorTest.java
@@ -18,8 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import org.junit.jupiter.api.Test;
import java.sql.ResultSet;
@@ -35,14 +34,13 @@ class UniqueKeyInventoryDataRecordPositionCreatorTest {
@Test
void assertCreate() throws SQLException {
- assertThat(new
UniqueKeyInventoryDataRecordPositionCreator().create(mockInventoryDumperContext(),
mock(ResultSet.class)), isA(UnsupportedKeyIngestPosition.class));
+ assertThat(new
UniqueKeyInventoryDataRecordPositionCreator().create(mockInventoryDumperContext(),
mock(ResultSet.class)), isA(UniqueKeyIngestPosition.class));
}
- @SuppressWarnings("unchecked")
private InventoryDumperContext mockInventoryDumperContext() {
InventoryDumperContext result = mock(InventoryDumperContext.class,
RETURNS_DEEP_STUBS);
- PrimaryKeyIngestPosition<Object> ingestPosition =
mock(PrimaryKeyIngestPosition.class);
- when(ingestPosition.getEndValue()).thenReturn(new Object());
+ UniqueKeyIngestPosition<?> ingestPosition =
mock(UniqueKeyIngestPosition.class);
+ when(ingestPosition.getUpperBound()).thenReturn(null);
when(result.getCommonContext().getPosition()).thenReturn(ingestPosition);
return result;
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
deleted file mode 100644
index a67a7034762..00000000000
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
-
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
-import org.junit.jupiter.api.Test;
-
-import java.math.BigInteger;
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-class PrimaryKeyIngestPositionFactoryTest {
-
- @Test
- void assertNewInstanceWithIntegerPrimaryKeyIngestPosition() {
-
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,100,200"),
new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(100L),
BigInteger.valueOf(200L)));
-
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,100,"),
new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(100L), null));
-
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,,200"),
new IntegerPrimaryKeyIngestPosition(null, BigInteger.valueOf(200L)));
-
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,,"),
new IntegerPrimaryKeyIngestPosition(null, null));
- }
-
- private void assertIntegerPrimaryKeyIngestPosition0(final
PrimaryKeyIngestPosition<?> actual, final IntegerPrimaryKeyIngestPosition
expected) {
- assertThat(actual, instanceOf(IntegerPrimaryKeyIngestPosition.class));
- assertThat(actual.getType(), is(expected.getType()));
- assertThat(actual.getBeginValue(), is(expected.getBeginValue()));
- assertThat(actual.getEndValue(), is(expected.getEndValue()));
- }
-
- @Test
- void assertNewInstanceWithStringPrimaryKeyIngestPosition() {
- StringPrimaryKeyIngestPosition actual =
(StringPrimaryKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance("s,a,b");
- assertThat(actual.getType(), is('s'));
- assertThat(actual.getBeginValue(), is("a"));
- assertThat(actual.getEndValue(), is("b"));
- }
-
- @Test
- void assertNewInstanceWithUnsupportedKeyIngestPosition() {
- UnsupportedKeyIngestPosition actual = (UnsupportedKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance("u,a,b");
- assertThat(actual.getType(), is('u'));
- assertNull(actual.getBeginValue());
- assertNull(actual.getEndValue());
- }
-
- @Test
- void assertNewInstanceWithIllegalArgument() {
- assertThrows(IllegalArgumentException.class, () ->
PrimaryKeyIngestPositionFactory.newInstance("z,100"));
- assertThrows(IllegalArgumentException.class, () ->
PrimaryKeyIngestPositionFactory.newInstance("zz,100,200"));
- assertThrows(IllegalArgumentException.class, () ->
PrimaryKeyIngestPositionFactory.newInstance("z,100,200"));
- }
-
- @Test
- void assertNewInstanceWithNumberRange() {
- IntegerPrimaryKeyIngestPosition actual =
(IntegerPrimaryKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance(100, 200);
- assertThat(actual.getType(), is('i'));
- assertThat(actual.getBeginValue(), is(BigInteger.valueOf(100L)));
- assertThat(actual.getEndValue(), is(BigInteger.valueOf(200L)));
- }
-
- @Test
- void assertNewInstanceWithNumberNullEndRange() {
- IntegerPrimaryKeyIngestPosition actual =
(IntegerPrimaryKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance(100, null);
- assertThat(actual.getType(), is('i'));
- assertThat(actual.getBeginValue(), is(BigInteger.valueOf(100L)));
- assertNull(actual.getEndValue());
- }
-
- @Test
- void assertNewInstanceWithCharRange() {
- StringPrimaryKeyIngestPosition actual =
(StringPrimaryKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance("a", "b");
- assertThat(actual.getType(), is('s'));
- assertThat(actual.getBeginValue(), is("a"));
- assertThat(actual.getEndValue(), is("b"));
- }
-
- @Test
- void assertNewInstanceWithCharNullEndRange() {
- StringPrimaryKeyIngestPosition actual =
(StringPrimaryKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance("a", null);
- assertThat(actual.getType(), is('s'));
- assertThat(actual.getBeginValue(), is("a"));
- assertNull(actual.getEndValue());
- }
-
- @Test
- void assertNewInstanceWithUnsupportedRange() {
- UnsupportedKeyIngestPosition actual = (UnsupportedKeyIngestPosition)
PrimaryKeyIngestPositionFactory.newInstance(Collections.emptyList(),
Collections.emptyList());
- assertThat(actual.getType(), is('u'));
- assertNull(actual.getBeginValue());
- assertNull(actual.getEndValue());
- }
-}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
new file mode 100644
index 00000000000..fb15f5e0cbc
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigInteger;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class UniqueKeyIngestPositionTest {
+
+ @Test
+ void assertEncodeInteger() {
+ assertThat(createIntegerPosition(1L, 100L).encode(), is("i,1,100"));
+ }
+
+ private UniqueKeyIngestPosition<BigInteger> createIntegerPosition(final
Long lowerBound, final Long upperBound) {
+ return UniqueKeyIngestPosition.ofInteger(Range.closed(null ==
lowerBound ? null : BigInteger.valueOf(lowerBound), null == upperBound ? null :
BigInteger.valueOf(upperBound)));
+ }
+
+ @Test
+ void assertEncodeIntegerWithNullValue() {
+ assertThat(createIntegerPosition(1L, null).encode(), is("i,1,"));
+ assertThat(createIntegerPosition(null, 100L).encode(), is("i,,100"));
+ assertThat(createIntegerPosition(null, null).encode(), is("i,,"));
+ }
+
+ @Test
+ void assertEncodeBigInteger() {
+ assertThat(UniqueKeyIngestPosition.ofInteger(Range.closed(new
BigInteger("12345678901234567890"), new
BigInteger("12345678901234567891"))).encode(),
+ is("i,12345678901234567890,12345678901234567891"));
+ }
+
+ @Test
+ void assertEncodeString() {
+ assertThat(UniqueKeyIngestPosition.ofString(Range.closed("hi",
"jk")).encode(), is("s,hi,jk"));
+ }
+
+ @Test
+ void assertEncodeStringWithNullValue() {
+ assertThat(UniqueKeyIngestPosition.ofString(Range.closed(null,
null)).encode(), is("s,,"));
+ }
+
+ @Test
+ void assertEncodeUnsplit() {
+ assertThat(UniqueKeyIngestPosition.ofUnsplit().encode(), is("u,,"));
+ }
+
+ @Test
+ void assertDecodeIntegerPosition() {
+ assertIntegerPosition0(UniqueKeyIngestPosition.decode("i,100,200"),
createIntegerPosition(100L, 200L));
+ assertIntegerPosition0(UniqueKeyIngestPosition.decode("i,100,"),
createIntegerPosition(100L, null));
+ assertIntegerPosition0(UniqueKeyIngestPosition.decode("i,,200"),
createIntegerPosition(null, 200L));
+ assertIntegerPosition0(UniqueKeyIngestPosition.decode("i,,"),
createIntegerPosition(null, null));
+ }
+
+ private void assertIntegerPosition0(final UniqueKeyIngestPosition<?>
actual, final UniqueKeyIngestPosition<BigInteger> expected) {
+ assertThat(actual, isA(UniqueKeyIngestPosition.class));
+ assertThat(actual.getType(), is(expected.getType()));
+ assertThat(actual.getLowerBound(), is(expected.getLowerBound()));
+ assertThat(actual.getUpperBound(), is(expected.getUpperBound()));
+ }
+
+ @Test
+ void assertDecodeStringPosition() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.decode("s,a,b");
+ assertThat(actual, isA(UniqueKeyIngestPosition.class));
+ assertThat(actual.getType(), is('s'));
+ assertThat(actual.getLowerBound(), is("a"));
+ assertThat(actual.getUpperBound(), is("b"));
+ }
+
+ @Test
+ void assertDecodeUnsplitPosition() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.decode("u,a,b");
+ assertThat(actual, isA(UniqueKeyIngestPosition.class));
+ assertThat(actual.getType(), is('u'));
+ assertNull(actual.getLowerBound());
+ assertNull(actual.getUpperBound());
+ }
+
+ @Test
+ void assertDecodeIllegalArgument() {
+ assertThrows(IllegalArgumentException.class, () ->
UniqueKeyIngestPosition.decode("z,100"));
+ assertThrows(IllegalArgumentException.class, () ->
UniqueKeyIngestPosition.decode("zz,100,200"));
+ assertThrows(IllegalArgumentException.class, () ->
UniqueKeyIngestPosition.decode("z,100,200"));
+ }
+
+ @Test
+ void assertNewInstanceWithNumberRange() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.newInstance(Range.closed(BigInteger.valueOf(100L),
BigInteger.valueOf(200L)));
+ assertThat(actual.getType(), is('i'));
+ assertThat(actual.getLowerBound(), is(BigInteger.valueOf(100L)));
+ assertThat(actual.getUpperBound(), is(BigInteger.valueOf(200L)));
+ }
+
+ @Test
+ void assertNewInstanceWithNumberNullEndRange() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.newInstance(Range.closed(BigInteger.valueOf(100L),
null));
+ assertThat(actual.getType(), is('i'));
+ assertThat(actual.getLowerBound(), is(BigInteger.valueOf(100L)));
+ assertNull(actual.getUpperBound());
+ }
+
+ @Test
+ void assertNewInstanceWithStringRange() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.newInstance(Range.closed("a", "b"));
+ assertThat(actual.getType(), is('s'));
+ assertThat(actual.getLowerBound(), is("a"));
+ assertThat(actual.getUpperBound(), is("b"));
+ }
+
+ @Test
+ void assertNewInstanceWithStringNullEndRange() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.newInstance(Range.closed("a", null));
+ assertThat(actual.getType(), is('s'));
+ assertThat(actual.getLowerBound(), is("a"));
+ assertNull(actual.getUpperBound());
+ }
+
+ @Test
+ void assertNewInstanceWithUnsplitRange() {
+ UniqueKeyIngestPosition<?> actual =
UniqueKeyIngestPosition.newInstance(Range.closed(null, null));
+ assertThat(actual.getType(), is('u'));
+ assertNull(actual.getLowerBound());
+ assertNull(actual.getUpperBound());
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
deleted file mode 100644
index 0aad9848b0d..00000000000
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import org.junit.jupiter.api.Test;
-
-import java.math.BigInteger;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class IntegerPrimaryKeyIngestPositionTest {
-
- @Test
- void assertToString() {
- assertThat(new IntegerPrimaryKeyIngestPosition(BigInteger.ONE,
BigInteger.valueOf(100L)).toString(), is("i,1,100"));
- }
-
- @Test
- void assertToStringWithNullValue() {
- assertThat(new IntegerPrimaryKeyIngestPosition(BigInteger.ONE,
null).toString(), is("i,1,"));
- assertThat(new IntegerPrimaryKeyIngestPosition(null,
BigInteger.valueOf(100L)).toString(), is("i,,100"));
- assertThat(new IntegerPrimaryKeyIngestPosition(null, null).toString(),
is("i,,"));
- }
-}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPositionTest.java
deleted file mode 100644
index 8cea55acf3e..00000000000
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPositionTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class StringPrimaryKeyIngestPositionTest {
-
- @Test
- void assertToString() {
- assertThat(new StringPrimaryKeyIngestPosition("hi", "jk").toString(),
is("s,hi,jk"));
- }
-
- @Test
- void assertToStringWithNullValue() {
- assertThat(new StringPrimaryKeyIngestPosition(null, null).toString(),
is("s,,"));
- }
-}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
deleted file mode 100644
index 5bc5fc51afc..00000000000
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class UnsupportedKeyIngestPositionTest {
-
- @Test
- void assertToString() {
- assertThat(new UnsupportedKeyIngestPosition().toString(), is("u,,"));
- }
-}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
index 723803a9def..4eaf3a1b8f6 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
@@ -17,11 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlTransmissionJobItemProgress;
@@ -69,7 +68,7 @@ class TransmissionJobItemProgressTest {
assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds0.t_1#1"),
isA(IngestFinishedPosition.class));
assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds1.t_1#1"),
isA(IngestPlaceholderPosition.class));
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds0.t_2#2"),
isA(IngestFinishedPosition.class));
-
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds1.t_2#2"),
isA(IntegerPrimaryKeyIngestPosition.class));
+
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds1.t_2#2"),
isA(UniqueKeyIngestPosition.class));
}
@Test
@@ -85,21 +84,21 @@ class TransmissionJobItemProgressTest {
@Test
void assertGetProgressesCorrectly() {
Map<String, InventoryTaskProgress> progresses = new HashMap<>(4, 1F);
- progresses.put("ds.order_item#0", new InventoryTaskProgress(new
IntegerPrimaryKeyIngestPosition(BigInteger.ONE, BigInteger.valueOf(100L))));
- progresses.put("ds.order_item#1", new InventoryTaskProgress(new
UnsupportedKeyIngestPosition()));
+ progresses.put("ds.order_item#0", new
InventoryTaskProgress(UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.ONE,
BigInteger.valueOf(100L)))));
+ progresses.put("ds.order_item#1", new
InventoryTaskProgress(UniqueKeyIngestPosition.ofUnsplit()));
progresses.put("ds.order#0", new InventoryTaskProgress(new
IngestFinishedPosition()));
- progresses.put("ds.test_order#0", new InventoryTaskProgress(new
StringPrimaryKeyIngestPosition("1", "100")));
+ progresses.put("ds.test_order#0", new
InventoryTaskProgress(UniqueKeyIngestPosition.ofString(Range.closed("1",
"100"))));
JobItemInventoryTasksProgress progress = new
JobItemInventoryTasksProgress(progresses);
Map<String, IngestPosition> orderPosition =
progress.getInventoryPosition("order");
assertThat(orderPosition.size(), is(1));
assertThat(orderPosition.get("ds.order#0"),
isA(IngestFinishedPosition.class));
Map<String, IngestPosition> testOrderPosition =
progress.getInventoryPosition("test_order");
assertThat(testOrderPosition.size(), is(1));
- assertThat(testOrderPosition.get("ds.test_order#0"),
isA(StringPrimaryKeyIngestPosition.class));
+ assertThat(testOrderPosition.get("ds.test_order#0"),
isA(UniqueKeyIngestPosition.class));
Map<String, IngestPosition> orderItemPosition =
progress.getInventoryPosition("order_item");
assertThat(orderItemPosition.size(), is(2));
- assertThat(orderItemPosition.get("ds.order_item#0"),
isA(IntegerPrimaryKeyIngestPosition.class));
- assertThat(orderItemPosition.get("ds.order_item#1"),
isA(UnsupportedKeyIngestPosition.class));
+ assertThat(orderItemPosition.get("ds.order_item#0"),
isA(UniqueKeyIngestPosition.class));
+ assertThat(orderItemPosition.get("ds.order_item#1"),
isA(UniqueKeyIngestPosition.class));
}
private TransmissionJobItemProgress getJobItemProgress(final String data) {
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
index b88e1fa59ec..c4eca31b6e1 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
@@ -29,11 +29,15 @@ class InventoryDataSparsenessCalculatorTest {
@Test
void assertIsIntegerUniqueKeyDataSparse() {
-
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(1000000L))));
-
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(null, BigInteger.valueOf(1000000L))));
-
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(BigInteger.ONE, null)));
-
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(1494000L))));
-
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(1495000L))));
-
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(1500000L))));
+
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1L,
createRange(1L, 1000000L)));
+
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
createRange(null, 1000000L)));
+
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
createRange(1L, null)));
+
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
createRange(1L, 1494000L)));
+
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
createRange(1L, 1495000L)));
+
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
createRange(1L, 1500000L)));
+ }
+
+ private Range<BigInteger> createRange(final Long lowerBound, final Long
upperBound) {
+ return Range.closed(null == lowerBound ? null :
BigInteger.valueOf(lowerBound), null == upperBound ? null :
BigInteger.valueOf(upperBound));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
index 4e1d5a3d01e..743f862e7ff 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import org.junit.jupiter.api.Test;
import java.math.BigInteger;
@@ -35,39 +35,39 @@ class InventoryPositionEstimatedCalculatorTest {
void assertGetIntegerPositions() {
List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
Range.closed(BigInteger.ONE, BigInteger.valueOf(600L)), 100L);
assertThat(actualPositions.size(), is(2));
- assertPosition(actualPositions.get(0), createIntegerPosition(1L,
300L));
- assertPosition(actualPositions.get(1), createIntegerPosition(301L,
600L));
+ assertIntegerPosition(actualPositions.get(0),
createIntegerPosition(1L, 300L));
+ assertIntegerPosition(actualPositions.get(1),
createIntegerPosition(301L, 600L));
}
- private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long
beginValue, final long endValue) {
- return new
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue),
BigInteger.valueOf(endValue));
+ private UniqueKeyIngestPosition<BigInteger> createIntegerPosition(final
long lowerBound, final long upperBound) {
+ return
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.valueOf(lowerBound),
BigInteger.valueOf(upperBound)));
}
- private void assertPosition(final IngestPosition actual, final
IntegerPrimaryKeyIngestPosition expected) {
- assertThat(actual, isA(IntegerPrimaryKeyIngestPosition.class));
- assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(),
is(expected.getBeginValue()));
- assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(),
is(expected.getEndValue()));
+ private void assertIntegerPosition(final IngestPosition actual, final
UniqueKeyIngestPosition<BigInteger> expected) {
+ assertThat(actual, isA(UniqueKeyIngestPosition.class));
+ assertThat(((UniqueKeyIngestPosition<?>) actual).getLowerBound(),
is(expected.getLowerBound()));
+ assertThat(((UniqueKeyIngestPosition<?>) actual).getUpperBound(),
is(expected.getUpperBound()));
}
@Test
void assertGetIntegerPositionsWithZeroTotalRecordsCount() {
List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(0L,
Range.closed(BigInteger.ZERO, BigInteger.ONE), 1L);
assertThat(actualPositions.size(), is(1));
- assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(null, null));
+ assertIntegerPosition(actualPositions.get(0),
UniqueKeyIngestPosition.ofInteger(Range.closed(null, null)));
}
@Test
void assertGetIntegerPositionsWithNullValue() {
List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
Range.closed(null, null), 1L);
assertThat(actualPositions.size(), is(1));
- assertPosition(actualPositions.get(0), new
IntegerPrimaryKeyIngestPosition(null, null));
+ assertIntegerPosition(actualPositions.get(0),
UniqueKeyIngestPosition.ofInteger(Range.closed(null, null)));
}
@Test
void assertGetIntegerPositionsWithTheSameMinMax() {
List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(200L,
Range.closed(BigInteger.valueOf(5L), BigInteger.valueOf(5L)), 100L);
assertThat(actualPositions.size(), is(1));
- assertPosition(actualPositions.get(0), createIntegerPosition(5L, 5L));
+ assertIntegerPosition(actualPositions.get(0),
createIntegerPosition(5L, 5L));
}
@Test
@@ -78,7 +78,7 @@ class InventoryPositionEstimatedCalculatorTest {
BigInteger upperBound = BigInteger.valueOf(Long.MAX_VALUE);
List<IngestPosition> actualPositions =
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount,
Range.closed(lowerBound, upperBound), shardingSize);
assertThat(actualPositions.size(), is(2));
- assertPosition(actualPositions.get(0),
createIntegerPosition(lowerBound.longValue(), 0L));
- assertPosition(actualPositions.get(1), createIntegerPosition(1L,
upperBound.longValue()));
+ assertIntegerPosition(actualPositions.get(0),
createIntegerPosition(lowerBound.longValue(), 0L));
+ assertIntegerPosition(actualPositions.get(1),
createIntegerPosition(1L, upperBound.longValue()));
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 5ee5d4aed7e..732fa3ea9c0 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -21,7 +21,8 @@ import com.google.protobuf.Int64Value;
import com.google.protobuf.InvalidProtocolBufferException;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import org.junit.jupiter.params.ParameterizedTest;
@@ -39,7 +40,7 @@ class DataRecordResultConvertUtilsTest {
@ParameterizedTest
@MethodSource("dataChangeTypeTestCases")
void assertConvertDataRecordToRecordWithNonInsertTypes(final
PipelineSQLOperationType operationType, final Record.DataChangeType
expectedDataChangeType) throws InvalidProtocolBufferException {
- DataRecord dataRecord = new DataRecord(operationType, "test_schema",
"t_user", new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(5L),
BigInteger.valueOf(10L)), 1);
+ DataRecord dataRecord = new DataRecord(operationType, "test_schema",
"t_user",
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.valueOf(5L),
BigInteger.valueOf(10L))), 1);
dataRecord.addColumn(new NormalColumn("id", 1L, 2L, true, true));
dataRecord.setCommitTime(123L);
Record actualRecord =
DataRecordResultConvertUtils.convertDataRecordToRecord("logic_db",
"test_schema", dataRecord);
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
index 0825de76307..f1e70f0c64d 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -18,8 +18,8 @@
package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -65,8 +65,8 @@ class ConsistencyCheckJobItemContextTest {
new TableCheckRangePosition(1, DATA_NODE, TABLE,
createIntegerPosition(101L, 200L), createIntegerPosition(101L, 203L), null,
132, 132, false, null));
}
- private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long
beginValue, final long endValue) {
- return new
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue),
BigInteger.valueOf(endValue));
+ private UniqueKeyIngestPosition<BigInteger> createIntegerPosition(final
long lowerBound, final long upperBound) {
+ return
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.valueOf(lowerBound),
BigInteger.valueOf(upperBound)));
}
private void assertTableCheckRangePosition(final TableCheckRangePosition
actual, final TableCheckRangePosition expected) {
@@ -80,10 +80,10 @@ class ConsistencyCheckJobItemContextTest {
assertThat(actual.isFinished(), is(expected.isFinished()));
}
- private void assertRange(final PrimaryKeyIngestPosition<?> actual, final
PrimaryKeyIngestPosition<?> expected) {
+ private void assertRange(final UniqueKeyIngestPosition<?> actual, final
UniqueKeyIngestPosition<?> expected) {
assertThat(actual.getClass(), is(expected.getClass()));
- assertThat(actual, instanceOf(IntegerPrimaryKeyIngestPosition.class));
- assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(),
is(((IntegerPrimaryKeyIngestPosition) expected).getBeginValue()));
- assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(),
is(((IntegerPrimaryKeyIngestPosition) expected).getEndValue()));
+ assertThat(actual, instanceOf(UniqueKeyIngestPosition.class));
+ assertThat(actual.getLowerBound(), is(expected.getLowerBound()));
+ assertThat(actual.getUpperBound(), is(expected.getUpperBound()));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index bc9b35f1b90..21257453936 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -34,7 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
@@ -136,7 +136,7 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
for (JobDataNodeEntry entry : each.getEntries()) {
for (DataNode dataNode : entry.getDataNodes()) {
result.add(new TableCheckRangePosition(splittingItem++,
dataNode.format(), entry.getLogicTableName(),
- new UnsupportedKeyIngestPosition(), new
UnsupportedKeyIngestPosition(), null));
+ UniqueKeyIngestPosition.ofUnsplit(),
UniqueKeyIngestPosition.ofUnsplit(), null));
}
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index 47640a16556..4175d57b31e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -33,7 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -194,7 +194,7 @@ class CDCE2EIT {
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
ConsistencyCheckJobItemProgressContext progressContext = new
ConsistencyCheckJobItemProgressContext("", 0,
sourceDataSource.getDatabaseType().getType());
progressContext.getTableCheckRangePositions().add(new
TableCheckRangePosition(0, null, qualifiedTable.getTableName(),
- new UnsupportedKeyIngestPosition(), new
UnsupportedKeyIngestPosition(), null));
+ UniqueKeyIngestPosition.ofUnsplit(),
UniqueKeyIngestPosition.ofUnsplit(), null));
TableInventoryCheckParameter param = new
TableInventoryCheckParameter("", sourceDataSource, targetDataSource,
qualifiedTable, qualifiedTable,
tableMetaData.getColumnNames(), uniqueKeys, null,
progressContext);
TableDataConsistencyChecker tableChecker =
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new
Properties());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
index 4d3a1ad3591..5e1af738d5f 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
@@ -19,8 +19,9 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
import org.apache.commons.text.RandomStringGenerator;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.util.DataSourceTestUtils;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -85,29 +86,29 @@ class InventoryIntegerPositionExactCalculatorTest {
void assertGetPositionsWithOrderIdUniqueKey() {
List<IngestPosition> actual =
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null,
"t_order"), "order_id", 3, dataSource, new IntegerPositionHandler());
assertThat(actual.size(), is(4));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(0),
createIntegerPosition(1L, 3L));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(1),
createIntegerPosition(4L, 6L));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(2),
createIntegerPosition(7L, 9L));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(3),
createIntegerPosition(10L, 11L));
+ assertIntegerPosition0(actual.get(0), createIntegerPosition(1L, 3L));
+ assertIntegerPosition0(actual.get(1), createIntegerPosition(4L, 6L));
+ assertIntegerPosition0(actual.get(2), createIntegerPosition(7L, 9L));
+ assertIntegerPosition0(actual.get(3), createIntegerPosition(10L, 11L));
}
- private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long
beginValue, final long endValue) {
- return new
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue),
BigInteger.valueOf(endValue));
+ private UniqueKeyIngestPosition<BigInteger> createIntegerPosition(final
long lowerBound, final long upperBound) {
+ return
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.valueOf(lowerBound),
BigInteger.valueOf(upperBound)));
}
- private void assertIntegerPrimaryKeyIngestPosition0(final IngestPosition
actual, final IntegerPrimaryKeyIngestPosition expected) {
- assertThat(actual, isA(IntegerPrimaryKeyIngestPosition.class));
- IntegerPrimaryKeyIngestPosition position =
(IntegerPrimaryKeyIngestPosition) actual;
+ private void assertIntegerPosition0(final IngestPosition actual, final
UniqueKeyIngestPosition<BigInteger> expected) {
+ assertThat(actual, isA(UniqueKeyIngestPosition.class));
+ UniqueKeyIngestPosition<?> position = (UniqueKeyIngestPosition<?>)
actual;
assertThat(position.getType(), is(expected.getType()));
- assertThat(position.getBeginValue(), is(expected.getBeginValue()));
- assertThat(position.getEndValue(), is(expected.getEndValue()));
+ assertThat(position.getLowerBound(), is(expected.getLowerBound()));
+ assertThat(position.getUpperBound(), is(expected.getUpperBound()));
}
@Test
void assertGetPositionsWithMultiColumnUniqueKeys() {
List<IngestPosition> actual =
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null,
"t_order"), "user_id", 3, dataSource, new IntegerPositionHandler());
assertThat(actual.size(), is(2));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(0),
createIntegerPosition(1L, 3L));
- assertIntegerPrimaryKeyIngestPosition0(actual.get(1),
createIntegerPosition(4L, 6L));
+ assertIntegerPosition0(actual.get(0), createIntegerPosition(1L, 3L));
+ assertIntegerPosition0(actual.get(1), createIntegerPosition(4L, 6L));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryStringPositionExactCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryStringPositionExactCalculatorTest.java
index 60ab751c35b..17e40cf16eb 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryStringPositionExactCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryStringPositionExactCalculatorTest.java
@@ -19,8 +19,9 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
import org.apache.commons.text.RandomStringGenerator;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.util.DataSourceTestUtils;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -84,25 +85,29 @@ class InventoryStringPositionExactCalculatorTest {
void assertGetPositionsWithOrderIdUniqueKey() {
List<IngestPosition> actual =
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null,
"t_order"), "order_id", 3, dataSource, new StringPositionHandler());
assertThat(actual.size(), is(4));
- assertStringPrimaryKeyIngestPosition0(actual.get(0), new
StringPrimaryKeyIngestPosition("a", "c"));
- assertStringPrimaryKeyIngestPosition0(actual.get(1), new
StringPrimaryKeyIngestPosition("d", "f"));
- assertStringPrimaryKeyIngestPosition0(actual.get(2), new
StringPrimaryKeyIngestPosition("g", "i"));
- assertStringPrimaryKeyIngestPosition0(actual.get(3), new
StringPrimaryKeyIngestPosition("j", "k"));
+ assertStringPosition0(actual.get(0), createStringPosition("a", "c"));
+ assertStringPosition0(actual.get(1), createStringPosition("d", "f"));
+ assertStringPosition0(actual.get(2), createStringPosition("g", "i"));
+ assertStringPosition0(actual.get(3), createStringPosition("j", "k"));
}
- private void assertStringPrimaryKeyIngestPosition0(final IngestPosition
actual, final StringPrimaryKeyIngestPosition expected) {
- assertThat(actual, isA(StringPrimaryKeyIngestPosition.class));
- StringPrimaryKeyIngestPosition position =
(StringPrimaryKeyIngestPosition) actual;
+ private UniqueKeyIngestPosition<String> createStringPosition(final String
lowerBound, final String upperBound) {
+ return UniqueKeyIngestPosition.ofString(Range.closed(lowerBound,
upperBound));
+ }
+
+ private void assertStringPosition0(final IngestPosition actual, final
UniqueKeyIngestPosition<String> expected) {
+ assertThat(actual, isA(UniqueKeyIngestPosition.class));
+ UniqueKeyIngestPosition<?> position = (UniqueKeyIngestPosition<?>)
actual;
assertThat(position.getType(), is(expected.getType()));
- assertThat(position.getBeginValue(), is(expected.getBeginValue()));
- assertThat(position.getEndValue(), is(expected.getEndValue()));
+ assertThat(position.getLowerBound(), is(expected.getLowerBound()));
+ assertThat(position.getUpperBound(), is(expected.getUpperBound()));
}
@Test
void assertGetPositionsWithMultiColumnUniqueKeys() {
List<IngestPosition> actual =
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null,
"t_order"), "user_id", 3, dataSource, new StringPositionHandler());
assertThat(actual.size(), is(2));
- assertStringPrimaryKeyIngestPosition0(actual.get(0), new
StringPrimaryKeyIngestPosition("a", "c"));
- assertStringPrimaryKeyIngestPosition0(actual.get(1), new
StringPrimaryKeyIngestPosition("d", "f"));
+ assertStringPosition0(actual.get(0), createStringPosition("a", "c"));
+ assertStringPosition0(actual.get(1), createStringPosition("d", "f"));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
index 051a6f5ad5c..a78701949d5 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -90,8 +90,8 @@ class InventoryTaskSplitterTest {
List<InventoryTask> actual =
inventoryTaskSplitter.split(jobItemContext);
assertThat(actual.size(), is(1));
InventoryTask task = actual.get(0);
- assertNull(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getBeginValue());
- assertNull(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getEndValue());
+ assertNull(((UniqueKeyIngestPosition<?>)
task.getTaskProgress().getPosition()).getLowerBound());
+ assertNull(((UniqueKeyIngestPosition<?>)
task.getTaskProgress().getPosition()).getUpperBound());
}
@Test
@@ -100,8 +100,8 @@ class InventoryTaskSplitterTest {
List<InventoryTask> actual =
inventoryTaskSplitter.split(jobItemContext);
assertThat(actual.size(), is(10));
InventoryTask task = actual.get(9);
- assertThat(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getBeginValue(),
is(BigInteger.valueOf(91L)));
- assertThat(((IntegerPrimaryKeyIngestPosition)
task.getTaskProgress().getPosition()).getEndValue(),
is(BigInteger.valueOf(100L)));
+ assertThat(((UniqueKeyIngestPosition<?>)
task.getTaskProgress().getPosition()).getLowerBound(),
is(BigInteger.valueOf(91L)));
+ assertThat(((UniqueKeyIngestPosition<?>)
task.getTaskProgress().getPosition()).getUpperBound(),
is(BigInteger.valueOf(100L)));
}
@Test
@@ -110,9 +110,9 @@ class InventoryTaskSplitterTest {
List<InventoryTask> actual =
inventoryTaskSplitter.split(jobItemContext);
assertThat(actual.size(), is(1));
assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0"));
- IntegerPrimaryKeyIngestPosition keyPosition =
(IntegerPrimaryKeyIngestPosition) actual.get(0).getTaskProgress().getPosition();
- assertThat(keyPosition.getBeginValue(), is(BigInteger.ONE));
- assertThat(keyPosition.getEndValue(), is(BigInteger.valueOf(999L)));
+ UniqueKeyIngestPosition<?> keyPosition = (UniqueKeyIngestPosition<?>)
actual.get(0).getTaskProgress().getPosition();
+ assertThat(keyPosition.getLowerBound(), is(BigInteger.ONE));
+ assertThat(keyPosition.getUpperBound(), is(BigInteger.valueOf(999L)));
}
@Test
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 030aec608cc..ea9e539aa8a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -23,8 +23,9 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -79,7 +80,7 @@ class InventoryTaskTest {
InventoryTask inventoryTask = new
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperContext),
PipelineContextUtils.getExecuteEngine(),
PipelineContextUtils.getExecuteEngine(), mock(Dumper.class),
mock(Importer.class), position);
CompletableFuture.allOf(inventoryTask.start().toArray(new
CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
- assertThat(inventoryTask.getTaskProgress().getPosition(),
isA(IntegerPrimaryKeyIngestPosition.class));
+ assertThat(inventoryTask.getTaskProgress().getPosition(),
isA(UniqueKeyIngestPosition.class));
}
private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
@@ -101,7 +102,7 @@ class InventoryTaskTest {
result.setActualTableName(actualTableName);
result.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
result.getCommonContext().setPosition(null ==
taskConfig.getDumperContext().getCommonContext().getPosition()
- ? new IntegerPrimaryKeyIngestPosition(BigInteger.ONE,
BigInteger.valueOf(1000L))
+ ?
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.ONE,
BigInteger.valueOf(1000L)))
:
taskConfig.getDumperContext().getCommonContext().getPosition());
return result;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 6e2d2a35607..dd50df7332c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePosition;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePositionSwapper;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
@@ -49,8 +49,8 @@ import java.util.Properties;
import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.mock;
@@ -101,8 +101,8 @@ class ConsistencyCheckJobExecutorCallbackTest {
result.setSplittingItem(0);
result.setSourceDataNode("ds_0.t_order");
result.setLogicTableName("t_order");
- result.setSourceRange(new UnsupportedKeyIngestPosition().toString());
- result.setTargetRange(new UnsupportedKeyIngestPosition().toString());
+ result.setSourceRange(UniqueKeyIngestPosition.ofUnsplit().encode());
+ result.setTargetRange(UniqueKeyIngestPosition.ofUnsplit().encode());
result.setSourcePosition(100);
result.setTargetPosition(100);
return result;