This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4267dd7b28d Add UniqueKeyIngestPosition to replace 
PrimaryKeyIngestPosition and PrimaryKeyIngestPositionFactory (#37589)
4267dd7b28d is described below

commit 4267dd7b28d0477d62b62f435ce9c0c38cdeae43
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Dec 31 14:01:35 2025 +0800

    Add UniqueKeyIngestPosition to replace PrimaryKeyIngestPosition and 
PrimaryKeyIngestPositionFactory (#37589)
    
    * Rename to unified lowerBound and upperBound
    
    * Add UniqueKeyIngestPosition
    
    * Add UniqueKeyIngestPositionTest
    
    * Replace PrimaryKeyIngestPositionFactory to UniqueKeyIngestPosition
    
    * Replace IntegerPrimaryKeyIngestPosition to UniqueKeyIngestPosition
    
    * Replace StringPrimaryKeyIngestPosition to UniqueKeyIngestPosition
    
    * Replace UnsupportedPrimaryKeyIngestPosition to UniqueKeyIngestPosition
    
    * Replace PrimaryKeyIngestPosition to UniqueKeyIngestPosition
    
    * Add UniqueKeyIngestPosition toString()
    
    * Update code style & UT
---
 .../position/TableCheckRangePosition.java          |   6 +-
 .../yaml/YamlTableCheckRangePositionSwapper.java   |   4 +-
 .../table/MatchingTableInventoryChecker.java       |   8 +-
 .../ingest/dumper/inventory/InventoryDumper.java   |   9 +-
 ...niqueKeyInventoryDataRecordPositionCreator.java |   8 +-
 .../query/IntegerRangeSplittingIterator.java       |   2 +-
 .../position/type/pk/PrimaryKeyIngestPosition.java |  49 ------
 .../type/pk/PrimaryKeyIngestPositionFactory.java   |  88 ----------
 .../position/type/pk/UniqueKeyIngestPosition.java  | 180 +++++++++++++++++++++
 .../pk/type/IntegerPrimaryKeyIngestPosition.java   |  57 -------
 .../pk/type/StringPrimaryKeyIngestPosition.java    |  48 ------
 .../type/pk/type/UnsupportedKeyIngestPosition.java |  46 ------
 .../YamlJobItemInventoryTasksProgressSwapper.java  |   4 +-
 .../position/InventoryPositionCalculator.java      |   4 +-
 .../InventoryPositionEstimatedCalculator.java      |   7 +-
 .../position/exact/DataTypePositionHandler.java    |   4 +-
 .../position/exact/IntegerPositionHandler.java     |   7 +-
 .../exact/InventoryPositionExactCalculator.java    |  12 +-
 .../position/exact/StringPositionHandler.java      |   8 +-
 .../splitter/InventoryDumperContextSplitter.java   |   4 +-
 ...eKeyInventoryDataRecordPositionCreatorTest.java |  10 +-
 .../pk/PrimaryKeyIngestPositionFactoryTest.java    | 113 -------------
 .../type/pk/UniqueKeyIngestPositionTest.java       | 149 +++++++++++++++++
 .../type/IntegerPrimaryKeyIngestPositionTest.java  |  40 -----
 .../type/StringPrimaryKeyIngestPositionTest.java   |  36 -----
 .../pk/type/UnsupportedKeyIngestPositionTest.java  |  31 ----
 .../progress/TransmissionJobItemProgressTest.java  |  19 ++-
 .../InventoryDataSparsenessCalculatorTest.java     |  16 +-
 .../InventoryPositionEstimatedCalculatorTest.java  |  28 ++--
 .../cdc/util/DataRecordResultConvertUtilsTest.java |   5 +-
 .../ConsistencyCheckJobItemContextTest.java        |  16 +-
 .../MigrationDataConsistencyChecker.java           |   4 +-
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java |   4 +-
 ...nventoryIntegerPositionExactCalculatorTest.java |  29 ++--
 ...InventoryStringPositionExactCalculatorTest.java |  29 ++--
 .../splitter/InventoryTaskSplitterTest.java        |  16 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |   7 +-
 .../ConsistencyCheckJobExecutorCallbackTest.java   |   8 +-
 38 files changed, 472 insertions(+), 643 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
index 430bc6f4e4e..983a04288b2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
@@ -22,7 +22,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 
 /**
  * Table check range position.
@@ -40,9 +40,9 @@ public final class TableCheckRangePosition {
     
     private final String logicTableName;
     
-    private final PrimaryKeyIngestPosition<?> sourceRange;
+    private final UniqueKeyIngestPosition<?> sourceRange;
     
-    private final PrimaryKeyIngestPosition<?> targetRange;
+    private final UniqueKeyIngestPosition<?> targetRange;
     
     private final String queryCondition;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
index 1236cd785fc..2e43175d7f5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml;
 
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
 /**
@@ -45,7 +45,7 @@ public final class YamlTableCheckRangePositionSwapper 
implements YamlConfigurati
     @Override
     public TableCheckRangePosition swapToObject(final 
YamlTableCheckRangePosition yamlConfig) {
         return new TableCheckRangePosition(yamlConfig.getSplittingItem(), 
yamlConfig.getSourceDataNode(), yamlConfig.getLogicTableName(),
-                
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getSourceRange()), 
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getTargetRange()),
+                UniqueKeyIngestPosition.decode(yamlConfig.getSourceRange()), 
UniqueKeyIngestPosition.decode(yamlConfig.getTargetRange()),
                 yamlConfig.getQueryCondition(), 
yamlConfig.getSourcePosition(), yamlConfig.getTargetPosition(), 
yamlConfig.isFinished(), yamlConfig.getMatched());
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index dc856cb2dbb..d92c397660b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -77,8 +77,8 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
         TableInventoryCalculateParameter sourceParam = new 
TableInventoryCalculateParameter(param.getSourceDataSource(), 
param.getSourceTable(),
                 param.getColumnNames(), param.getUniqueKeys(), 
QueryType.RANGE_QUERY, param.getQueryCondition());
         TableCheckRangePosition checkRangePosition = 
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
-        sourceParam.setRange(Range.closed(null != 
checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition() 
: checkRangePosition.getSourceRange().getBeginValue(),
-                checkRangePosition.getSourceRange().getEndValue()));
+        sourceParam.setRange(Range.closed(null != 
checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition() 
: checkRangePosition.getSourceRange().getLowerBound(),
+                checkRangePosition.getSourceRange().getUpperBound()));
         TableInventoryCalculateParameter targetParam = 
getTableInventoryCalculateParameter(param, checkRangePosition);
         TableInventoryCalculator<TableInventoryCheckCalculatedResult> 
sourceCalculator = buildSingleTableInventoryCalculator();
         this.sourceCalculator = sourceCalculator;
@@ -132,8 +132,8 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     private TableInventoryCalculateParameter 
getTableInventoryCalculateParameter(final TableInventoryCheckParameter param, 
final TableCheckRangePosition checkRangePosition) {
         TableInventoryCalculateParameter result = new 
TableInventoryCalculateParameter(param.getTargetDataSource(), 
param.getTargetTable(),
                 param.getColumnNames(), param.getUniqueKeys(), 
QueryType.RANGE_QUERY, param.getQueryCondition());
-        result.setRange(Range.closed(null != 
checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition() 
: checkRangePosition.getTargetRange().getBeginValue(),
-                checkRangePosition.getTargetRange().getEndValue()));
+        result.setRange(Range.closed(null != 
checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition() 
: checkRangePosition.getTargetRange().getLowerBound(),
+                checkRangePosition.getTargetRange().getUpperBound()));
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index dc8995ebc09..7f8d3361b2e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -27,15 +27,14 @@ import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLife
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractRecordTableInventoryCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
@@ -124,7 +123,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         List<String> columnNames = dumperContext.getQueryColumnNames();
         TableInventoryCalculateParameter calculateParam = new 
TableInventoryCalculateParameter(dataSource, table,
                 columnNames, dumperContext.getUniqueKeyColumns(), 
QueryType.RANGE_QUERY, null);
-        Range<?> range = Range.closed(((PrimaryKeyIngestPosition<?>) 
initialPosition).getBeginValue(), ((PrimaryKeyIngestPosition<?>) 
initialPosition).getEndValue());
+        Range<?> range = Range.closed(((UniqueKeyIngestPosition<?>) 
initialPosition).getLowerBound(), ((UniqueKeyIngestPosition<?>) 
initialPosition).getUpperBound());
         calculateParam.setRange(range);
         RecordTableInventoryDumpCalculator dumpCalculator = new 
RecordTableInventoryDumpCalculator(dumperContext.getBatchSize(), 
StreamingRangeType.SMALL);
         long rowCount = 0L;
@@ -132,7 +131,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
             String firstUniqueKey = 
calculateParam.getFirstUniqueKey().getName();
             for (List<DataRecord> each : 
dumpCalculator.calculate(calculateParam)) {
                 channel.push(Collections.unmodifiableList(each));
-                IngestPosition position = 
PrimaryKeyIngestPositionFactory.newInstance(dumpCalculator.getFirstUniqueKeyValue(each.get(each.size()
 - 1), firstUniqueKey), range.getUpperBound());
+                IngestPosition position = 
UniqueKeyIngestPosition.newInstance(Range.closed(dumpCalculator.getFirstUniqueKeyValue(each.get(each.size()
 - 1), firstUniqueKey), range.getUpperBound()));
                 dumperContext.getCommonContext().setPosition(position);
                 rowCount += each.size();
             }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
index a4d2fcf032e..3d454cde3fb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
@@ -19,9 +19,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.pos
 
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -33,7 +33,7 @@ public final class 
UniqueKeyInventoryDataRecordPositionCreator implements Invent
     
     @Override
     public IngestPosition create(final InventoryDumperContext dumperContext, 
final ResultSet resultSet) throws SQLException {
-        return PrimaryKeyIngestPositionFactory.newInstance(
-                
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), 
((PrimaryKeyIngestPosition<?>) 
dumperContext.getCommonContext().getPosition()).getEndValue());
+        return 
UniqueKeyIngestPosition.newInstance(Range.closed(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()),
+                ((UniqueKeyIngestPosition<?>) 
dumperContext.getCommonContext().getPosition()).getUpperBound()));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/IntegerRangeSplittingIterator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/IntegerRangeSplittingIterator.java
index 51916a73bc6..91028b5657b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/IntegerRangeSplittingIterator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/IntegerRangeSplittingIterator.java
@@ -36,7 +36,7 @@ public final class IntegerRangeSplittingIterator implements 
Iterator<Range<BigIn
     
     public IntegerRangeSplittingIterator(final BigInteger lowerBound, final 
BigInteger upperBound, final BigInteger stepSize) {
         if (lowerBound.compareTo(upperBound) > 0) {
-            throw new IllegalArgumentException("lower bounder greater than 
upper bound");
+            throw new IllegalArgumentException("lower bound greater than upper 
bound");
         }
         if (stepSize.compareTo(BigInteger.ZERO) < 0) {
             throw new IllegalArgumentException("step size is less than zero");
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPosition.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPosition.java
deleted file mode 100644
index 1a07b3fdf61..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPosition.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
-
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-
-/**
- * Primary key ingest position.
- * 
- * @param <T> type of value
- */
-public interface PrimaryKeyIngestPosition<T> extends IngestPosition {
-    
-    /**
-     * Get begin value.
-     *
-     * @return begin value
-     */
-    T getBeginValue();
-    
-    /**
-     * Get end value.
-     *
-     * @return end value
-     */
-    T getEndValue();
-    
-    /**
-     * Get type.
-     *
-     * @return type
-     */
-    char getType();
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
deleted file mode 100644
index c2ba6deabdd..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
-
-import java.math.BigInteger;
-import java.util.List;
-
-/**
- * Primary key ingest position factory.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PrimaryKeyIngestPositionFactory {
-    
-    /**
-     * Create new instance by string data.
-     *
-     * @param data string data
-     * @return primary key position
-     * @throws IllegalArgumentException illegal argument exception
-     */
-    public static PrimaryKeyIngestPosition<?> newInstance(final String data) {
-        List<String> parts = Splitter.on(',').splitToList(data);
-        Preconditions.checkArgument(3 == parts.size(), "Unknown primary key 
position: " + data);
-        Preconditions.checkArgument(1 == parts.get(0).length(), "Invalid 
primary key position type: " + parts.get(0));
-        char type = parts.get(0).charAt(0);
-        String beginValue = parts.get(1);
-        String endValue = parts.get(2);
-        switch (type) {
-            case 'i':
-                return new 
IntegerPrimaryKeyIngestPosition(Strings.isNullOrEmpty(beginValue) ? null : new 
BigInteger(beginValue), Strings.isNullOrEmpty(endValue) ? null : new 
BigInteger(endValue));
-            case 's':
-                return new StringPrimaryKeyIngestPosition(beginValue, 
endValue);
-            case 'u':
-                return new UnsupportedKeyIngestPosition();
-            default:
-                throw new IllegalArgumentException("Unknown primary key 
position type: " + type);
-        }
-    }
-    
-    /**
-     * New instance by begin value and end value.
-     *
-     * @param beginValue begin value
-     * @param endValue end value
-     * @return ingest position
-     */
-    public static PrimaryKeyIngestPosition<?> newInstance(final Object 
beginValue, final Object endValue) {
-        if (beginValue instanceof Number) {
-            return new 
IntegerPrimaryKeyIngestPosition(convertToBigInteger((Number) beginValue), null 
== endValue ? null : convertToBigInteger((Number) endValue));
-        }
-        if (beginValue instanceof CharSequence) {
-            return new StringPrimaryKeyIngestPosition(beginValue.toString(), 
null == endValue ? null : endValue.toString());
-        }
-        // TODO support more types, e.g. byte[] (MySQL varbinary)
-        return new UnsupportedKeyIngestPosition();
-    }
-    
-    private static BigInteger convertToBigInteger(final Number number) {
-        if (number instanceof BigInteger) {
-            return (BigInteger) number;
-        }
-        return BigInteger.valueOf(number.longValue());
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
new file mode 100644
index 00000000000..453146d8d9a
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPosition.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+
+import java.math.BigInteger;
+import java.util.List;
+
+/**
+ * Unique key ingest position.
+ *
+ * @param <T> type of value
+ */
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public final class UniqueKeyIngestPosition<T> implements IngestPosition {
+    
+    @Getter
+    private final char type;
+    
+    private final Range<T> range;
+    
+    /**
+     * New instance by lower bound and upper bound.
+     *
+     * @param <T> type of value
+     * @param range range
+     * @return ingest position
+     */
+    public static <T> UniqueKeyIngestPosition<?> newInstance(final Range<T> 
range) {
+        T lowerBound = range.getLowerBound();
+        T upperBound = range.getUpperBound();
+        if (lowerBound instanceof Number) {
+            BigInteger lower = convertToBigInteger((Number) lowerBound);
+            BigInteger upper = null == upperBound ? null : 
convertToBigInteger((Number) upperBound);
+            return ofInteger(range.isLowerInclusive() ? Range.closed(lower, 
upper) : Range.openClosed(lower, upper));
+        }
+        if (lowerBound instanceof CharSequence) {
+            String lower = lowerBound.toString();
+            String upper = null == upperBound ? null : upperBound.toString();
+            return ofString(range.isLowerInclusive() ? Range.closed(lower, 
upper) : Range.openClosed(lower, upper));
+        }
+        // TODO support more types, e.g. byte[] (MySQL varbinary)
+        return ofUnsplit();
+    }
+    
+    private static BigInteger convertToBigInteger(final Number number) {
+        if (number instanceof BigInteger) {
+            return (BigInteger) number;
+        }
+        return BigInteger.valueOf(number.longValue());
+    }
+    
+    /**
+     * Create integer unique key ingest position.
+     *
+     * @param range range
+     * @return integer unique key ingest position
+     */
+    public static UniqueKeyIngestPosition<BigInteger> ofInteger(final 
Range<BigInteger> range) {
+        return new UniqueKeyIngestPosition<>('i', range);
+    }
+    
+    /**
+     * Create string unique key ingest position.
+     *
+     * @param range range
+     * @return string unique key ingest position
+     */
+    public static UniqueKeyIngestPosition<String> ofString(final Range<String> 
range) {
+        return new UniqueKeyIngestPosition<>('s', range);
+    }
+    
+    /**
+     * Create unsplit unique key ingest position.
+     *
+     * @return unsplit unique key ingest position
+     */
+    public static UniqueKeyIngestPosition<Void> ofUnsplit() {
+        return new UniqueKeyIngestPosition<>('u', Range.closed(null, null));
+    }
+    
+    /**
+     * Create new instance by text.
+     *
+     * @param text text
+     * @return unique key position
+     * @throws IllegalArgumentException illegal argument exception
+     */
+    public static UniqueKeyIngestPosition<?> decode(final String text) {
+        List<String> parts = Splitter.on(',').splitToList(text);
+        Preconditions.checkArgument(3 == parts.size(), "Unknown unique key 
position: " + text);
+        Preconditions.checkArgument(1 == parts.get(0).length(), "Invalid 
unique key position type: " + parts.get(0));
+        char type = parts.get(0).charAt(0);
+        String lowerBound = parts.get(1);
+        String upperBound = parts.get(2);
+        switch (type) {
+            case 'i':
+                BigInteger lower = Strings.isNullOrEmpty(lowerBound) ? null : 
new BigInteger(lowerBound);
+                BigInteger upper = Strings.isNullOrEmpty(upperBound) ? null : 
new BigInteger(upperBound);
+                return ofInteger(Range.closed(lower, upper));
+            case 's':
+                return ofString(Range.closed(lowerBound, upperBound));
+            case 'u':
+                return ofUnsplit();
+            default:
+                throw new IllegalArgumentException("Unknown unique key 
position type: " + type);
+        }
+    }
+    
+    /**
+     * Encode to text.
+     *
+     * @return encoded text
+     * @throws RuntimeException runtime exception
+     */
+    public String encode() {
+        T lowerBound = getLowerBound();
+        T upperBound = getUpperBound();
+        String encodedLowerBound;
+        String encodedUpperBound;
+        switch (getType()) {
+            case 'i':
+            case 's':
+            case 'u':
+                encodedLowerBound = null == lowerBound ? "" : 
lowerBound.toString();
+                encodedUpperBound = null == upperBound ? "" : 
upperBound.toString();
+                break;
+            default:
+                throw new RuntimeException("Unknown unique key position type: 
" + getType());
+        }
+        return String.format("%s,%s,%s", getType(), encodedLowerBound, 
encodedUpperBound);
+    }
+    
+    /**
+     * Get lower bound.
+     *
+     * @return lower bound
+     */
+    public T getLowerBound() {
+        return range.getLowerBound();
+    }
+    
+    /**
+     * Get upper bound.
+     *
+     * @return upper bound
+     */
+    public T getUpperBound() {
+        return range.getUpperBound();
+    }
+    
+    @Override
+    public String toString() {
+        // TODO Add encode() method in IngestPosition interface, and remove 
.toString() invocations.
+        return encode();
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
deleted file mode 100644
index c82121c2025..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPosition.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-
-import java.math.BigInteger;
-
-/**
- * Integer primary key ingest position.
- */
-public final class IntegerPrimaryKeyIngestPosition implements 
PrimaryKeyIngestPosition<BigInteger> {
-    
-    private final BigInteger beginValue;
-    
-    private final BigInteger endValue;
-    
-    public IntegerPrimaryKeyIngestPosition(final BigInteger beginValue, final 
BigInteger endValue) {
-        this.beginValue = beginValue;
-        this.endValue = endValue;
-    }
-    
-    @Override
-    public BigInteger getBeginValue() {
-        return beginValue;
-    }
-    
-    @Override
-    public BigInteger getEndValue() {
-        return endValue;
-    }
-    
-    @Override
-    public char getType() {
-        return 'i';
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("%s,%s,%s", getType(), null == beginValue ? "" : 
beginValue, null == endValue ? "" : endValue);
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPosition.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPosition.java
deleted file mode 100644
index 96cf79ae708..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPosition.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import com.google.common.base.Strings;
-import lombok.Getter;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-
-/**
- * String primary key ingest position.
- */
-@Getter
-public final class StringPrimaryKeyIngestPosition implements 
PrimaryKeyIngestPosition<String> {
-    
-    private final String beginValue;
-    
-    private final String endValue;
-    
-    public StringPrimaryKeyIngestPosition(final String beginValue, final 
String endValue) {
-        this.beginValue = Strings.emptyToNull(beginValue);
-        this.endValue = Strings.emptyToNull(endValue);
-    }
-    
-    @Override
-    public char getType() {
-        return 's';
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("%s,%s,%s", getType(), null == beginValue ? "" : 
beginValue, null == endValue ? "" : endValue);
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPosition.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPosition.java
deleted file mode 100644
index 8406c07518d..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPosition.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-
-/**
- * Unsupported key ingest position.
- */
-public final class UnsupportedKeyIngestPosition implements 
PrimaryKeyIngestPosition<Void> {
-    
-    @Override
-    public Void getBeginValue() {
-        return null;
-    }
-    
-    @Override
-    public Void getEndValue() {
-        return null;
-    }
-    
-    @Override
-    public char getType() {
-        return 'u';
-    }
-    
-    @Override
-    public String toString() {
-        return String.format("%s,,", getType());
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemInventoryTasksProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemInventoryTasksProgressSwapper.java
index e3c464b8f66..53accc9d52f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemInventoryTasksProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlJobItemInventoryTasksProgressSwapper.java
@@ -19,8 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper;
 
 import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemInventoryTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlJobItemInventoryTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
@@ -83,6 +83,6 @@ public final class YamlJobItemInventoryTasksProgressSwapper {
     
     private Function<Entry<String, String>, InventoryTaskProgress> 
getInventoryTaskProgressFunction() {
         return entry -> new InventoryTaskProgress(
-                Strings.isNullOrEmpty(entry.getValue()) ? new 
IngestPlaceholderPosition() : 
PrimaryKeyIngestPositionFactory.newInstance(entry.getValue()));
+                Strings.isNullOrEmpty(entry.getValue()) ? new 
IngestPlaceholderPosition() : UniqueKeyIngestPosition.decode(entry.getValue()));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
index 17ca03b6b5d..baf3743c797 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/InventoryPositionCalculator.java
@@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.InventoryDataSparsenessCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.estimated.InventoryPositionEstimatedCalculator;
@@ -66,7 +66,7 @@ public final class InventoryPositionCalculator {
         if (dataTypeOption.isStringDataType(firstColumnDataType)) {
             return getStringPositions();
         }
-        return Collections.singletonList(new UnsupportedKeyIngestPosition());
+        return Collections.singletonList(UniqueKeyIngestPosition.ofUnsplit());
     }
     
     private List<IngestPosition> getIntegerPositions() {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
index 7889168491e..c35e231b6da 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculator.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJ
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.IntegerRangeSplittingIterator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
@@ -81,15 +81,14 @@ public final class InventoryPositionEstimatedCalculator {
         BigInteger lowerBound = uniqueKeyValuesRange.getLowerBound();
         BigInteger upperBound = uniqueKeyValuesRange.getUpperBound();
         if (0 == tableRecordsCount || null == lowerBound || null == 
upperBound) {
-            return Collections.singletonList(new 
IntegerPrimaryKeyIngestPosition(null, null));
+            return 
Collections.singletonList(UniqueKeyIngestPosition.ofInteger(Range.closed(null, 
null)));
         }
         List<IngestPosition> result = new LinkedList<>();
         long splitCount = tableRecordsCount / shardingSize + 
(tableRecordsCount % shardingSize > 0 ? 1 : 0);
         BigInteger stepSize = 
upperBound.subtract(lowerBound).divide(BigInteger.valueOf(splitCount));
         IntegerRangeSplittingIterator rangeIterator = new 
IntegerRangeSplittingIterator(lowerBound, upperBound, stepSize);
         while (rangeIterator.hasNext()) {
-            Range<BigInteger> range = rangeIterator.next();
-            result.add(new 
IntegerPrimaryKeyIngestPosition(range.getLowerBound(), range.getUpperBound()));
+            
result.add(UniqueKeyIngestPosition.ofInteger(rangeIterator.next()));
         }
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
index 591c201e64b..040a715eae8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
@@ -17,7 +17,7 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -57,5 +57,5 @@ public interface DataTypePositionHandler<T> {
      * @param upperBound upper bound
      * @return ingest position
      */
-    PrimaryKeyIngestPosition<T> createIngestPosition(T lowerBound, T 
upperBound);
+    UniqueKeyIngestPosition<T> createIngestPosition(T lowerBound, T 
upperBound);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
index f401132e436..0febb10491d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/IntegerPositionHandler.java
@@ -17,7 +17,8 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -42,7 +43,7 @@ public final class IntegerPositionHandler implements 
DataTypePositionHandler<Big
     }
     
     @Override
-    public IntegerPrimaryKeyIngestPosition createIngestPosition(final 
BigInteger lowerBound, final BigInteger upperBound) {
-        return new IntegerPrimaryKeyIngestPosition(lowerBound, upperBound);
+    public UniqueKeyIngestPosition<BigInteger> createIngestPosition(final 
BigInteger lowerBound, final BigInteger upperBound) {
+        return UniqueKeyIngestPosition.ofInteger(Range.closed(lowerBound, 
upperBound));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
index 6d4440e677b..cc53f6c86f9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
@@ -56,14 +56,14 @@ public final class InventoryPositionExactCalculator {
     public static <T> List<IngestPosition> getPositions(final QualifiedTable 
qualifiedTable, final String uniqueKey, final int shardingSize,
                                                         final 
PipelineDataSource dataSource, final DataTypePositionHandler<T> 
positionHandler) {
         List<IngestPosition> result = new LinkedList<>();
-        PrimaryKeyIngestPosition<T> firstPosition = 
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, dataSource, 
positionHandler);
+        UniqueKeyIngestPosition<T> firstPosition = 
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, dataSource, 
positionHandler);
         result.add(firstPosition);
         result.addAll(getLeftPositions(qualifiedTable, uniqueKey, 
shardingSize, firstPosition, dataSource, positionHandler));
         return result;
     }
     
-    private static <T> PrimaryKeyIngestPosition<T> getFirstPosition(final 
QualifiedTable qualifiedTable, final String uniqueKey, final int shardingSize,
-                                                                    final 
PipelineDataSource dataSource, final DataTypePositionHandler<T> 
positionHandler) {
+    private static <T> UniqueKeyIngestPosition<T> getFirstPosition(final 
QualifiedTable qualifiedTable, final String uniqueKey, final int shardingSize,
+                                                                   final 
PipelineDataSource dataSource, final DataTypePositionHandler<T> 
positionHandler) {
         String firstQuerySQL = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType())
                 
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName(), uniqueKey, false);
         try (
@@ -90,10 +90,10 @@ public final class InventoryPositionExactCalculator {
     }
     
     private static <T> List<IngestPosition> getLeftPositions(final 
QualifiedTable qualifiedTable, final String uniqueKey,
-                                                             final int 
shardingSize, final PrimaryKeyIngestPosition<T> firstPosition,
+                                                             final int 
shardingSize, final UniqueKeyIngestPosition<T> firstPosition,
                                                              final 
PipelineDataSource dataSource, final DataTypePositionHandler<T> 
positionHandler) {
         List<IngestPosition> result = new LinkedList<>();
-        T lowerBound = firstPosition.getEndValue();
+        T lowerBound = firstPosition.getUpperBound();
         long recordsCount = 0;
         String laterQuerySQL = new 
PipelinePrepareSQLBuilder(dataSource.getDatabaseType())
                 
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName(), uniqueKey, true);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
index 331503f7c24..43086e37d10 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/StringPositionHandler.java
@@ -17,8 +17,8 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -40,7 +40,7 @@ public final class StringPositionHandler implements 
DataTypePositionHandler<Stri
     }
     
     @Override
-    public PrimaryKeyIngestPosition<String> createIngestPosition(final String 
lowerBound, final String upperBound) {
-        return new StringPrimaryKeyIngestPosition(lowerBound, upperBound);
+    public UniqueKeyIngestPosition<String> createIngestPosition(final String 
lowerBound, final String upperBound) {
+        return UniqueKeyIngestPosition.ofString(Range.closed(lowerBound, 
upperBound));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 34fae406389..4ef23e9e6df 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessC
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
@@ -107,7 +107,7 @@ public final class InventoryDumperContextSplitter {
         long tableRecordsCount = 
InventoryRecordsCountCalculator.getTableRecordsCount(dumperContext, dataSource);
         jobItemContext.updateInventoryRecordsCount(tableRecordsCount);
         if (!dumperContext.hasUniqueKey()) {
-            return Collections.singleton(new UnsupportedKeyIngestPosition());
+            return Collections.singleton(UniqueKeyIngestPosition.ofUnsplit());
         }
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         int shardingSize = 
jobItemContext.getJobProcessContext().getProcessConfiguration().getRead().getShardingSize();
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreatorTest.java
index eb3a7753c17..9a0298edeae 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreatorTest.java
@@ -18,8 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type;
 
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import org.junit.jupiter.api.Test;
 
 import java.sql.ResultSet;
@@ -35,14 +34,13 @@ class UniqueKeyInventoryDataRecordPositionCreatorTest {
     
     @Test
     void assertCreate() throws SQLException {
-        assertThat(new 
UniqueKeyInventoryDataRecordPositionCreator().create(mockInventoryDumperContext(),
 mock(ResultSet.class)), isA(UnsupportedKeyIngestPosition.class));
+        assertThat(new 
UniqueKeyInventoryDataRecordPositionCreator().create(mockInventoryDumperContext(),
 mock(ResultSet.class)), isA(UniqueKeyIngestPosition.class));
     }
     
-    @SuppressWarnings("unchecked")
     private InventoryDumperContext mockInventoryDumperContext() {
         InventoryDumperContext result = mock(InventoryDumperContext.class, 
RETURNS_DEEP_STUBS);
-        PrimaryKeyIngestPosition<Object> ingestPosition = 
mock(PrimaryKeyIngestPosition.class);
-        when(ingestPosition.getEndValue()).thenReturn(new Object());
+        UniqueKeyIngestPosition<?> ingestPosition = 
mock(UniqueKeyIngestPosition.class);
+        when(ingestPosition.getUpperBound()).thenReturn(null);
         
when(result.getCommonContext().getPosition()).thenReturn(ingestPosition);
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
deleted file mode 100644
index a67a7034762..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/PrimaryKeyIngestPositionFactoryTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
-
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
-import org.junit.jupiter.api.Test;
-
-import java.math.BigInteger;
-import java.util.Collections;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-class PrimaryKeyIngestPositionFactoryTest {
-    
-    @Test
-    void assertNewInstanceWithIntegerPrimaryKeyIngestPosition() {
-        
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,100,200"),
 new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(100L), 
BigInteger.valueOf(200L)));
-        
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,100,"),
 new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(100L), null));
-        
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,,200"),
 new IntegerPrimaryKeyIngestPosition(null, BigInteger.valueOf(200L)));
-        
assertIntegerPrimaryKeyIngestPosition0(PrimaryKeyIngestPositionFactory.newInstance("i,,"),
 new IntegerPrimaryKeyIngestPosition(null, null));
-    }
-    
-    private void assertIntegerPrimaryKeyIngestPosition0(final 
PrimaryKeyIngestPosition<?> actual, final IntegerPrimaryKeyIngestPosition 
expected) {
-        assertThat(actual, instanceOf(IntegerPrimaryKeyIngestPosition.class));
-        assertThat(actual.getType(), is(expected.getType()));
-        assertThat(actual.getBeginValue(), is(expected.getBeginValue()));
-        assertThat(actual.getEndValue(), is(expected.getEndValue()));
-    }
-    
-    @Test
-    void assertNewInstanceWithStringPrimaryKeyIngestPosition() {
-        StringPrimaryKeyIngestPosition actual = 
(StringPrimaryKeyIngestPosition) 
PrimaryKeyIngestPositionFactory.newInstance("s,a,b");
-        assertThat(actual.getType(), is('s'));
-        assertThat(actual.getBeginValue(), is("a"));
-        assertThat(actual.getEndValue(), is("b"));
-    }
-    
-    @Test
-    void assertNewInstanceWithUnsupportedKeyIngestPosition() {
-        UnsupportedKeyIngestPosition actual = (UnsupportedKeyIngestPosition) 
PrimaryKeyIngestPositionFactory.newInstance("u,a,b");
-        assertThat(actual.getType(), is('u'));
-        assertNull(actual.getBeginValue());
-        assertNull(actual.getEndValue());
-    }
-    
-    @Test
-    void assertNewInstanceWithIllegalArgument() {
-        assertThrows(IllegalArgumentException.class, () -> 
PrimaryKeyIngestPositionFactory.newInstance("z,100"));
-        assertThrows(IllegalArgumentException.class, () -> 
PrimaryKeyIngestPositionFactory.newInstance("zz,100,200"));
-        assertThrows(IllegalArgumentException.class, () -> 
PrimaryKeyIngestPositionFactory.newInstance("z,100,200"));
-    }
-    
-    @Test
-    void assertNewInstanceWithNumberRange() {
-        IntegerPrimaryKeyIngestPosition actual = 
(IntegerPrimaryKeyIngestPosition) 
PrimaryKeyIngestPositionFactory.newInstance(100, 200);
-        assertThat(actual.getType(), is('i'));
-        assertThat(actual.getBeginValue(), is(BigInteger.valueOf(100L)));
-        assertThat(actual.getEndValue(), is(BigInteger.valueOf(200L)));
-    }
-    
-    @Test
-    void assertNewInstanceWithNumberNullEndRange() {
-        IntegerPrimaryKeyIngestPosition actual = 
(IntegerPrimaryKeyIngestPosition) 
PrimaryKeyIngestPositionFactory.newInstance(100, null);
-        assertThat(actual.getType(), is('i'));
-        assertThat(actual.getBeginValue(), is(BigInteger.valueOf(100L)));
-        assertNull(actual.getEndValue());
-    }
-    
-    @Test
-    void assertNewInstanceWithCharRange() {
-        StringPrimaryKeyIngestPosition actual = 
(StringPrimaryKeyIngestPosition) 
PrimaryKeyIngestPositionFactory.newInstance("a", "b");
-        assertThat(actual.getType(), is('s'));
-        assertThat(actual.getBeginValue(), is("a"));
-        assertThat(actual.getEndValue(), is("b"));
-    }
-    
-    @Test
-    void assertNewInstanceWithCharNullEndRange() {
-        StringPrimaryKeyIngestPosition actual = 
(StringPrimaryKeyIngestPosition) 
PrimaryKeyIngestPositionFactory.newInstance("a", null);
-        assertThat(actual.getType(), is('s'));
-        assertThat(actual.getBeginValue(), is("a"));
-        assertNull(actual.getEndValue());
-    }
-    
-    @Test
-    void assertNewInstanceWithUnsupportedRange() {
-        UnsupportedKeyIngestPosition actual = (UnsupportedKeyIngestPosition) 
PrimaryKeyIngestPositionFactory.newInstance(Collections.emptyList(), 
Collections.emptyList());
-        assertThat(actual.getType(), is('u'));
-        assertNull(actual.getBeginValue());
-        assertNull(actual.getEndValue());
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
new file mode 100644
index 00000000000..fb15f5e0cbc
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/UniqueKeyIngestPositionTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigInteger;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class UniqueKeyIngestPositionTest {
+    
+    @Test
+    void assertEncodeInteger() {
+        assertThat(createIntegerPosition(1L, 100L).encode(), is("i,1,100"));
+    }
+    
+    private UniqueKeyIngestPosition<BigInteger> createIntegerPosition(final 
Long lowerBound, final Long upperBound) {
+        return UniqueKeyIngestPosition.ofInteger(Range.closed(null == 
lowerBound ? null : BigInteger.valueOf(lowerBound), null == upperBound ? null : 
BigInteger.valueOf(upperBound)));
+    }
+    
+    @Test
+    void assertEncodeIntegerWithNullValue() {
+        assertThat(createIntegerPosition(1L, null).encode(), is("i,1,"));
+        assertThat(createIntegerPosition(null, 100L).encode(), is("i,,100"));
+        assertThat(createIntegerPosition(null, null).encode(), is("i,,"));
+    }
+    
+    @Test
+    void assertEncodeBigInteger() {
+        assertThat(UniqueKeyIngestPosition.ofInteger(Range.closed(new 
BigInteger("12345678901234567890"), new 
BigInteger("12345678901234567891"))).encode(),
+                is("i,12345678901234567890,12345678901234567891"));
+    }
+    
+    @Test
+    void assertEncodeString() {
+        assertThat(UniqueKeyIngestPosition.ofString(Range.closed("hi", 
"jk")).encode(), is("s,hi,jk"));
+    }
+    
+    @Test
+    void assertEncodeStringWithNullValue() {
+        assertThat(UniqueKeyIngestPosition.ofString(Range.closed(null, 
null)).encode(), is("s,,"));
+    }
+    
+    @Test
+    void assertEncodeUnsplit() {
+        assertThat(UniqueKeyIngestPosition.ofUnsplit().encode(), is("u,,"));
+    }
+    
+    @Test
+    void assertDecodeIntegerPosition() {
+        assertIntegerPosition0(UniqueKeyIngestPosition.decode("i,100,200"), 
createIntegerPosition(100L, 200L));
+        assertIntegerPosition0(UniqueKeyIngestPosition.decode("i,100,"), 
createIntegerPosition(100L, null));
+        assertIntegerPosition0(UniqueKeyIngestPosition.decode("i,,200"), 
createIntegerPosition(null, 200L));
+        assertIntegerPosition0(UniqueKeyIngestPosition.decode("i,,"), 
createIntegerPosition(null, null));
+    }
+    
+    private void assertIntegerPosition0(final UniqueKeyIngestPosition<?> 
actual, final UniqueKeyIngestPosition<BigInteger> expected) {
+        assertThat(actual, isA(UniqueKeyIngestPosition.class));
+        assertThat(actual.getType(), is(expected.getType()));
+        assertThat(actual.getLowerBound(), is(expected.getLowerBound()));
+        assertThat(actual.getUpperBound(), is(expected.getUpperBound()));
+    }
+    
+    @Test
+    void assertDecodeStringPosition() {
+        UniqueKeyIngestPosition<?> actual = 
UniqueKeyIngestPosition.decode("s,a,b");
+        assertThat(actual, isA(UniqueKeyIngestPosition.class));
+        assertThat(actual.getType(), is('s'));
+        assertThat(actual.getLowerBound(), is("a"));
+        assertThat(actual.getUpperBound(), is("b"));
+    }
+    
+    @Test
+    void assertDecodeUnsplitPosition() {
+        UniqueKeyIngestPosition<?> actual = 
UniqueKeyIngestPosition.decode("u,a,b");
+        assertThat(actual, isA(UniqueKeyIngestPosition.class));
+        assertThat(actual.getType(), is('u'));
+        assertNull(actual.getLowerBound());
+        assertNull(actual.getUpperBound());
+    }
+    
+    @Test
+    void assertDecodeIllegalArgument() {
+        assertThrows(IllegalArgumentException.class, () -> 
UniqueKeyIngestPosition.decode("z,100"));
+        assertThrows(IllegalArgumentException.class, () -> 
UniqueKeyIngestPosition.decode("zz,100,200"));
+        assertThrows(IllegalArgumentException.class, () -> 
UniqueKeyIngestPosition.decode("z,100,200"));
+    }
+    
+    @Test
+    void assertNewInstanceWithNumberRange() {
+        UniqueKeyIngestPosition<?> actual = 
UniqueKeyIngestPosition.newInstance(Range.closed(BigInteger.valueOf(100L), 
BigInteger.valueOf(200L)));
+        assertThat(actual.getType(), is('i'));
+        assertThat(actual.getLowerBound(), is(BigInteger.valueOf(100L)));
+        assertThat(actual.getUpperBound(), is(BigInteger.valueOf(200L)));
+    }
+    
+    @Test
+    void assertNewInstanceWithNumberNullEndRange() {
+        UniqueKeyIngestPosition<?> actual = 
UniqueKeyIngestPosition.newInstance(Range.closed(BigInteger.valueOf(100L), 
null));
+        assertThat(actual.getType(), is('i'));
+        assertThat(actual.getLowerBound(), is(BigInteger.valueOf(100L)));
+        assertNull(actual.getUpperBound());
+    }
+    
+    @Test
+    void assertNewInstanceWithStringRange() {
+        UniqueKeyIngestPosition<?> actual = 
UniqueKeyIngestPosition.newInstance(Range.closed("a", "b"));
+        assertThat(actual.getType(), is('s'));
+        assertThat(actual.getLowerBound(), is("a"));
+        assertThat(actual.getUpperBound(), is("b"));
+    }
+    
+    @Test
+    void assertNewInstanceWithStringNullEndRange() {
+        UniqueKeyIngestPosition<?> actual = 
UniqueKeyIngestPosition.newInstance(Range.closed("a", null));
+        assertThat(actual.getType(), is('s'));
+        assertThat(actual.getLowerBound(), is("a"));
+        assertNull(actual.getUpperBound());
+    }
+    
+    @Test
+    void assertNewInstanceWithUnsplitRange() {
+        UniqueKeyIngestPosition<?> actual = 
UniqueKeyIngestPosition.newInstance(Range.closed(null, null));
+        assertThat(actual.getType(), is('u'));
+        assertNull(actual.getLowerBound());
+        assertNull(actual.getUpperBound());
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
deleted file mode 100644
index 0aad9848b0d..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/IntegerPrimaryKeyIngestPositionTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import org.junit.jupiter.api.Test;
-
-import java.math.BigInteger;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class IntegerPrimaryKeyIngestPositionTest {
-    
-    @Test
-    void assertToString() {
-        assertThat(new IntegerPrimaryKeyIngestPosition(BigInteger.ONE, 
BigInteger.valueOf(100L)).toString(), is("i,1,100"));
-    }
-    
-    @Test
-    void assertToStringWithNullValue() {
-        assertThat(new IntegerPrimaryKeyIngestPosition(BigInteger.ONE, 
null).toString(), is("i,1,"));
-        assertThat(new IntegerPrimaryKeyIngestPosition(null, 
BigInteger.valueOf(100L)).toString(), is("i,,100"));
-        assertThat(new IntegerPrimaryKeyIngestPosition(null, null).toString(), 
is("i,,"));
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPositionTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPositionTest.java
deleted file mode 100644
index 8cea55acf3e..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/StringPrimaryKeyIngestPositionTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class StringPrimaryKeyIngestPositionTest {
-    
-    @Test
-    void assertToString() {
-        assertThat(new StringPrimaryKeyIngestPosition("hi", "jk").toString(), 
is("s,hi,jk"));
-    }
-    
-    @Test
-    void assertToStringWithNullValue() {
-        assertThat(new StringPrimaryKeyIngestPosition(null, null).toString(), 
is("s,,"));
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
deleted file mode 100644
index 5bc5fc51afc..00000000000
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type;
-
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class UnsupportedKeyIngestPositionTest {
-    
-    @Test
-    void assertToString() {
-        assertThat(new UnsupportedKeyIngestPosition().toString(), is("u,,"));
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
index 723803a9def..4eaf3a1b8f6 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/TransmissionJobItemProgressTest.java
@@ -17,11 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.progress;
 
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlTransmissionJobItemProgress;
@@ -69,7 +68,7 @@ class TransmissionJobItemProgressTest {
         
assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds0.t_1#1"), 
isA(IngestFinishedPosition.class));
         
assertThat(actual.getInventory().getInventoryPosition("t_1").get("ds1.t_1#1"), 
isA(IngestPlaceholderPosition.class));
         
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds0.t_2#2"), 
isA(IngestFinishedPosition.class));
-        
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds1.t_2#2"), 
isA(IntegerPrimaryKeyIngestPosition.class));
+        
assertThat(actual.getInventory().getInventoryPosition("t_2").get("ds1.t_2#2"), 
isA(UniqueKeyIngestPosition.class));
     }
     
     @Test
@@ -85,21 +84,21 @@ class TransmissionJobItemProgressTest {
     @Test
     void assertGetProgressesCorrectly() {
         Map<String, InventoryTaskProgress> progresses = new HashMap<>(4, 1F);
-        progresses.put("ds.order_item#0", new InventoryTaskProgress(new 
IntegerPrimaryKeyIngestPosition(BigInteger.ONE, BigInteger.valueOf(100L))));
-        progresses.put("ds.order_item#1", new InventoryTaskProgress(new 
UnsupportedKeyIngestPosition()));
+        progresses.put("ds.order_item#0", new 
InventoryTaskProgress(UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.ONE,
 BigInteger.valueOf(100L)))));
+        progresses.put("ds.order_item#1", new 
InventoryTaskProgress(UniqueKeyIngestPosition.ofUnsplit()));
         progresses.put("ds.order#0", new InventoryTaskProgress(new 
IngestFinishedPosition()));
-        progresses.put("ds.test_order#0", new InventoryTaskProgress(new 
StringPrimaryKeyIngestPosition("1", "100")));
+        progresses.put("ds.test_order#0", new 
InventoryTaskProgress(UniqueKeyIngestPosition.ofString(Range.closed("1", 
"100"))));
         JobItemInventoryTasksProgress progress = new 
JobItemInventoryTasksProgress(progresses);
         Map<String, IngestPosition> orderPosition = 
progress.getInventoryPosition("order");
         assertThat(orderPosition.size(), is(1));
         assertThat(orderPosition.get("ds.order#0"), 
isA(IngestFinishedPosition.class));
         Map<String, IngestPosition> testOrderPosition = 
progress.getInventoryPosition("test_order");
         assertThat(testOrderPosition.size(), is(1));
-        assertThat(testOrderPosition.get("ds.test_order#0"), 
isA(StringPrimaryKeyIngestPosition.class));
+        assertThat(testOrderPosition.get("ds.test_order#0"), 
isA(UniqueKeyIngestPosition.class));
         Map<String, IngestPosition> orderItemPosition = 
progress.getInventoryPosition("order_item");
         assertThat(orderItemPosition.size(), is(2));
-        assertThat(orderItemPosition.get("ds.order_item#0"), 
isA(IntegerPrimaryKeyIngestPosition.class));
-        assertThat(orderItemPosition.get("ds.order_item#1"), 
isA(UnsupportedKeyIngestPosition.class));
+        assertThat(orderItemPosition.get("ds.order_item#0"), 
isA(UniqueKeyIngestPosition.class));
+        assertThat(orderItemPosition.get("ds.order_item#1"), 
isA(UniqueKeyIngestPosition.class));
     }
     
     private TransmissionJobItemProgress getJobItemProgress(final String data) {
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
index b88e1fa59ec..c4eca31b6e1 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/InventoryDataSparsenessCalculatorTest.java
@@ -29,11 +29,15 @@ class InventoryDataSparsenessCalculatorTest {
     
     @Test
     void assertIsIntegerUniqueKeyDataSparse() {
-        
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1L, 
Range.closed(BigInteger.ONE, BigInteger.valueOf(1000000L))));
-        
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 Range.closed(null, BigInteger.valueOf(1000000L))));
-        
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 Range.closed(BigInteger.ONE, null)));
-        
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 Range.closed(BigInteger.ONE, BigInteger.valueOf(1494000L))));
-        
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 Range.closed(BigInteger.ONE, BigInteger.valueOf(1495000L))));
-        
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 Range.closed(BigInteger.ONE, BigInteger.valueOf(1500000L))));
+        
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1L, 
createRange(1L, 1000000L)));
+        
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 createRange(null, 1000000L)));
+        
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 createRange(1L, null)));
+        
assertFalse(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 createRange(1L, 1494000L)));
+        
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 createRange(1L, 1495000L)));
+        
assertTrue(InventoryDataSparsenessCalculator.isIntegerUniqueKeyDataSparse(1000000L,
 createRange(1L, 1500000L)));
+    }
+    
+    private Range<BigInteger> createRange(final Long lowerBound, final Long 
upperBound) {
+        return Range.closed(null == lowerBound ? null : 
BigInteger.valueOf(lowerBound), null == upperBound ? null : 
BigInteger.valueOf(upperBound));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
index 4e1d5a3d01e..743f862e7ff 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/estimated/InventoryPositionEstimatedCalculatorTest.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
 
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigInteger;
@@ -35,39 +35,39 @@ class InventoryPositionEstimatedCalculatorTest {
     void assertGetIntegerPositions() {
         List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, 
Range.closed(BigInteger.ONE, BigInteger.valueOf(600L)), 100L);
         assertThat(actualPositions.size(), is(2));
-        assertPosition(actualPositions.get(0), createIntegerPosition(1L, 
300L));
-        assertPosition(actualPositions.get(1), createIntegerPosition(301L, 
600L));
+        assertIntegerPosition(actualPositions.get(0), 
createIntegerPosition(1L, 300L));
+        assertIntegerPosition(actualPositions.get(1), 
createIntegerPosition(301L, 600L));
     }
     
-    private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long 
beginValue, final long endValue) {
-        return new 
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue), 
BigInteger.valueOf(endValue));
+    private UniqueKeyIngestPosition<BigInteger> createIntegerPosition(final 
long lowerBound, final long upperBound) {
+        return 
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.valueOf(lowerBound), 
BigInteger.valueOf(upperBound)));
     }
     
-    private void assertPosition(final IngestPosition actual, final 
IntegerPrimaryKeyIngestPosition expected) {
-        assertThat(actual, isA(IntegerPrimaryKeyIngestPosition.class));
-        assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(), 
is(expected.getBeginValue()));
-        assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(), 
is(expected.getEndValue()));
+    private void assertIntegerPosition(final IngestPosition actual, final 
UniqueKeyIngestPosition<BigInteger> expected) {
+        assertThat(actual, isA(UniqueKeyIngestPosition.class));
+        assertThat(((UniqueKeyIngestPosition<?>) actual).getLowerBound(), 
is(expected.getLowerBound()));
+        assertThat(((UniqueKeyIngestPosition<?>) actual).getUpperBound(), 
is(expected.getUpperBound()));
     }
     
     @Test
     void assertGetIntegerPositionsWithZeroTotalRecordsCount() {
         List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(0L, 
Range.closed(BigInteger.ZERO, BigInteger.ONE), 1L);
         assertThat(actualPositions.size(), is(1));
-        assertPosition(actualPositions.get(0), new 
IntegerPrimaryKeyIngestPosition(null, null));
+        assertIntegerPosition(actualPositions.get(0), 
UniqueKeyIngestPosition.ofInteger(Range.closed(null, null)));
     }
     
     @Test
     void assertGetIntegerPositionsWithNullValue() {
         List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, 
Range.closed(null, null), 1L);
         assertThat(actualPositions.size(), is(1));
-        assertPosition(actualPositions.get(0), new 
IntegerPrimaryKeyIngestPosition(null, null));
+        assertIntegerPosition(actualPositions.get(0), 
UniqueKeyIngestPosition.ofInteger(Range.closed(null, null)));
     }
     
     @Test
     void assertGetIntegerPositionsWithTheSameMinMax() {
         List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(200L, 
Range.closed(BigInteger.valueOf(5L), BigInteger.valueOf(5L)), 100L);
         assertThat(actualPositions.size(), is(1));
-        assertPosition(actualPositions.get(0), createIntegerPosition(5L, 5L));
+        assertIntegerPosition(actualPositions.get(0), 
createIntegerPosition(5L, 5L));
     }
     
     @Test
@@ -78,7 +78,7 @@ class InventoryPositionEstimatedCalculatorTest {
         BigInteger upperBound = BigInteger.valueOf(Long.MAX_VALUE);
         List<IngestPosition> actualPositions = 
InventoryPositionEstimatedCalculator.getIntegerPositions(tableRecordsCount, 
Range.closed(lowerBound, upperBound), shardingSize);
         assertThat(actualPositions.size(), is(2));
-        assertPosition(actualPositions.get(0), 
createIntegerPosition(lowerBound.longValue(), 0L));
-        assertPosition(actualPositions.get(1), createIntegerPosition(1L, 
upperBound.longValue()));
+        assertIntegerPosition(actualPositions.get(0), 
createIntegerPosition(lowerBound.longValue(), 0L));
+        assertIntegerPosition(actualPositions.get(1), 
createIntegerPosition(1L, upperBound.longValue()));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 5ee5d4aed7e..732fa3ea9c0 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -21,7 +21,8 @@ import com.google.protobuf.Int64Value;
 import com.google.protobuf.InvalidProtocolBufferException;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -39,7 +40,7 @@ class DataRecordResultConvertUtilsTest {
     @ParameterizedTest
     @MethodSource("dataChangeTypeTestCases")
     void assertConvertDataRecordToRecordWithNonInsertTypes(final 
PipelineSQLOperationType operationType, final Record.DataChangeType 
expectedDataChangeType) throws InvalidProtocolBufferException {
-        DataRecord dataRecord = new DataRecord(operationType, "test_schema", 
"t_user", new IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(5L), 
BigInteger.valueOf(10L)), 1);
+        DataRecord dataRecord = new DataRecord(operationType, "test_schema", 
"t_user", 
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.valueOf(5L), 
BigInteger.valueOf(10L))), 1);
         dataRecord.addColumn(new NormalColumn("id", 1L, 2L, true, true));
         dataRecord.setCommitTime(123L);
         Record actualRecord = 
DataRecordResultConvertUtils.convertDataRecordToRecord("logic_db", 
"test_schema", dataRecord);
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
index 0825de76307..f1e70f0c64d 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -18,8 +18,8 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
 
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -65,8 +65,8 @@ class ConsistencyCheckJobItemContextTest {
                 new TableCheckRangePosition(1, DATA_NODE, TABLE, 
createIntegerPosition(101L, 200L), createIntegerPosition(101L, 203L), null, 
132, 132, false, null));
     }
     
-    private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long 
beginValue, final long endValue) {
-        return new 
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue), 
BigInteger.valueOf(endValue));
+    private UniqueKeyIngestPosition<BigInteger> createIntegerPosition(final 
long lowerBound, final long upperBound) {
+        return 
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.valueOf(lowerBound), 
BigInteger.valueOf(upperBound)));
     }
     
     private void assertTableCheckRangePosition(final TableCheckRangePosition 
actual, final TableCheckRangePosition expected) {
@@ -80,10 +80,10 @@ class ConsistencyCheckJobItemContextTest {
         assertThat(actual.isFinished(), is(expected.isFinished()));
     }
     
-    private void assertRange(final PrimaryKeyIngestPosition<?> actual, final 
PrimaryKeyIngestPosition<?> expected) {
+    private void assertRange(final UniqueKeyIngestPosition<?> actual, final 
UniqueKeyIngestPosition<?> expected) {
         assertThat(actual.getClass(), is(expected.getClass()));
-        assertThat(actual, instanceOf(IntegerPrimaryKeyIngestPosition.class));
-        assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(), 
is(((IntegerPrimaryKeyIngestPosition) expected).getBeginValue()));
-        assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(), 
is(((IntegerPrimaryKeyIngestPosition) expected).getEndValue()));
+        assertThat(actual, instanceOf(UniqueKeyIngestPosition.class));
+        assertThat(actual.getLowerBound(), is(expected.getLowerBound()));
+        assertThat(actual.getUpperBound(), is(expected.getUpperBound()));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index bc9b35f1b90..21257453936 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -34,7 +34,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
@@ -136,7 +136,7 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
             for (JobDataNodeEntry entry : each.getEntries()) {
                 for (DataNode dataNode : entry.getDataNodes()) {
                     result.add(new TableCheckRangePosition(splittingItem++, 
dataNode.format(), entry.getLogicTableName(),
-                            new UnsupportedKeyIngestPosition(), new 
UnsupportedKeyIngestPosition(), null));
+                            UniqueKeyIngestPosition.ofUnsplit(), 
UniqueKeyIngestPosition.ofUnsplit(), null));
                 }
             }
         }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index 47640a16556..4175d57b31e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -33,7 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -194,7 +194,7 @@ class CDCE2EIT {
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
         ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0, 
sourceDataSource.getDatabaseType().getType());
         progressContext.getTableCheckRangePositions().add(new 
TableCheckRangePosition(0, null, qualifiedTable.getTableName(),
-                new UnsupportedKeyIngestPosition(), new 
UnsupportedKeyIngestPosition(), null));
+                UniqueKeyIngestPosition.ofUnsplit(), 
UniqueKeyIngestPosition.ofUnsplit(), null));
         TableInventoryCheckParameter param = new 
TableInventoryCheckParameter("", sourceDataSource, targetDataSource, 
qualifiedTable, qualifiedTable,
                 tableMetaData.getColumnNames(), uniqueKeys, null, 
progressContext);
         TableDataConsistencyChecker tableChecker = 
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new 
Properties());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
index 4d3a1ad3591..5e1af738d5f 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryIntegerPositionExactCalculatorTest.java
@@ -19,8 +19,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
 
 import org.apache.commons.text.RandomStringGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.util.DataSourceTestUtils;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -85,29 +86,29 @@ class InventoryIntegerPositionExactCalculatorTest {
     void assertGetPositionsWithOrderIdUniqueKey() {
         List<IngestPosition> actual = 
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null, 
"t_order"), "order_id", 3, dataSource, new IntegerPositionHandler());
         assertThat(actual.size(), is(4));
-        assertIntegerPrimaryKeyIngestPosition0(actual.get(0), 
createIntegerPosition(1L, 3L));
-        assertIntegerPrimaryKeyIngestPosition0(actual.get(1), 
createIntegerPosition(4L, 6L));
-        assertIntegerPrimaryKeyIngestPosition0(actual.get(2), 
createIntegerPosition(7L, 9L));
-        assertIntegerPrimaryKeyIngestPosition0(actual.get(3), 
createIntegerPosition(10L, 11L));
+        assertIntegerPosition0(actual.get(0), createIntegerPosition(1L, 3L));
+        assertIntegerPosition0(actual.get(1), createIntegerPosition(4L, 6L));
+        assertIntegerPosition0(actual.get(2), createIntegerPosition(7L, 9L));
+        assertIntegerPosition0(actual.get(3), createIntegerPosition(10L, 11L));
     }
     
-    private IntegerPrimaryKeyIngestPosition createIntegerPosition(final long 
beginValue, final long endValue) {
-        return new 
IntegerPrimaryKeyIngestPosition(BigInteger.valueOf(beginValue), 
BigInteger.valueOf(endValue));
+    private UniqueKeyIngestPosition<BigInteger> createIntegerPosition(final 
long lowerBound, final long upperBound) {
+        return 
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.valueOf(lowerBound), 
BigInteger.valueOf(upperBound)));
     }
     
-    private void assertIntegerPrimaryKeyIngestPosition0(final IngestPosition 
actual, final IntegerPrimaryKeyIngestPosition expected) {
-        assertThat(actual, isA(IntegerPrimaryKeyIngestPosition.class));
-        IntegerPrimaryKeyIngestPosition position = 
(IntegerPrimaryKeyIngestPosition) actual;
+    private void assertIntegerPosition0(final IngestPosition actual, final 
UniqueKeyIngestPosition<BigInteger> expected) {
+        assertThat(actual, isA(UniqueKeyIngestPosition.class));
+        UniqueKeyIngestPosition<?> position = (UniqueKeyIngestPosition<?>) 
actual;
         assertThat(position.getType(), is(expected.getType()));
-        assertThat(position.getBeginValue(), is(expected.getBeginValue()));
-        assertThat(position.getEndValue(), is(expected.getEndValue()));
+        assertThat(position.getLowerBound(), is(expected.getLowerBound()));
+        assertThat(position.getUpperBound(), is(expected.getUpperBound()));
     }
     
     @Test
     void assertGetPositionsWithMultiColumnUniqueKeys() {
         List<IngestPosition> actual = 
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null, 
"t_order"), "user_id", 3, dataSource, new IntegerPositionHandler());
         assertThat(actual.size(), is(2));
-        assertIntegerPrimaryKeyIngestPosition0(actual.get(0), 
createIntegerPosition(1L, 3L));
-        assertIntegerPrimaryKeyIngestPosition0(actual.get(1), 
createIntegerPosition(4L, 6L));
+        assertIntegerPosition0(actual.get(0), createIntegerPosition(1L, 3L));
+        assertIntegerPosition0(actual.get(1), createIntegerPosition(4L, 6L));
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryStringPositionExactCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryStringPositionExactCalculatorTest.java
index 60ab751c35b..17e40cf16eb 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryStringPositionExactCalculatorTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryStringPositionExactCalculatorTest.java
@@ -19,8 +19,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculat
 
 import org.apache.commons.text.RandomStringGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.StringPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.util.DataSourceTestUtils;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -84,25 +85,29 @@ class InventoryStringPositionExactCalculatorTest {
     void assertGetPositionsWithOrderIdUniqueKey() {
         List<IngestPosition> actual = 
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null, 
"t_order"), "order_id", 3, dataSource, new StringPositionHandler());
         assertThat(actual.size(), is(4));
-        assertStringPrimaryKeyIngestPosition0(actual.get(0), new 
StringPrimaryKeyIngestPosition("a", "c"));
-        assertStringPrimaryKeyIngestPosition0(actual.get(1), new 
StringPrimaryKeyIngestPosition("d", "f"));
-        assertStringPrimaryKeyIngestPosition0(actual.get(2), new 
StringPrimaryKeyIngestPosition("g", "i"));
-        assertStringPrimaryKeyIngestPosition0(actual.get(3), new 
StringPrimaryKeyIngestPosition("j", "k"));
+        assertStringPosition0(actual.get(0), createStringPosition("a", "c"));
+        assertStringPosition0(actual.get(1), createStringPosition("d", "f"));
+        assertStringPosition0(actual.get(2), createStringPosition("g", "i"));
+        assertStringPosition0(actual.get(3), createStringPosition("j", "k"));
     }
     
-    private void assertStringPrimaryKeyIngestPosition0(final IngestPosition 
actual, final StringPrimaryKeyIngestPosition expected) {
-        assertThat(actual, isA(StringPrimaryKeyIngestPosition.class));
-        StringPrimaryKeyIngestPosition position = 
(StringPrimaryKeyIngestPosition) actual;
+    private UniqueKeyIngestPosition<String> createStringPosition(final String 
lowerBound, final String upperBound) {
+        return UniqueKeyIngestPosition.ofString(Range.closed(lowerBound, 
upperBound));
+    }
+    
+    private void assertStringPosition0(final IngestPosition actual, final 
UniqueKeyIngestPosition<String> expected) {
+        assertThat(actual, isA(UniqueKeyIngestPosition.class));
+        UniqueKeyIngestPosition<?> position = (UniqueKeyIngestPosition<?>) 
actual;
         assertThat(position.getType(), is(expected.getType()));
-        assertThat(position.getBeginValue(), is(expected.getBeginValue()));
-        assertThat(position.getEndValue(), is(expected.getEndValue()));
+        assertThat(position.getLowerBound(), is(expected.getLowerBound()));
+        assertThat(position.getUpperBound(), is(expected.getUpperBound()));
     }
     
     @Test
     void assertGetPositionsWithMultiColumnUniqueKeys() {
         List<IngestPosition> actual = 
InventoryPositionExactCalculator.getPositions(new QualifiedTable(null, 
"t_order"), "user_id", 3, dataSource, new StringPositionHandler());
         assertThat(actual.size(), is(2));
-        assertStringPrimaryKeyIngestPosition0(actual.get(0), new 
StringPrimaryKeyIngestPosition("a", "c"));
-        assertStringPrimaryKeyIngestPosition0(actual.get(1), new 
StringPrimaryKeyIngestPosition("d", "f"));
+        assertStringPosition0(actual.get(0), createStringPosition("a", "c"));
+        assertStringPosition0(actual.get(1), createStringPosition("d", "f"));
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
index 051a6f5ad5c..a78701949d5 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitterTest.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
@@ -90,8 +90,8 @@ class InventoryTaskSplitterTest {
         List<InventoryTask> actual = 
inventoryTaskSplitter.split(jobItemContext);
         assertThat(actual.size(), is(1));
         InventoryTask task = actual.get(0);
-        assertNull(((IntegerPrimaryKeyIngestPosition) 
task.getTaskProgress().getPosition()).getBeginValue());
-        assertNull(((IntegerPrimaryKeyIngestPosition) 
task.getTaskProgress().getPosition()).getEndValue());
+        assertNull(((UniqueKeyIngestPosition<?>) 
task.getTaskProgress().getPosition()).getLowerBound());
+        assertNull(((UniqueKeyIngestPosition<?>) 
task.getTaskProgress().getPosition()).getUpperBound());
     }
     
     @Test
@@ -100,8 +100,8 @@ class InventoryTaskSplitterTest {
         List<InventoryTask> actual = 
inventoryTaskSplitter.split(jobItemContext);
         assertThat(actual.size(), is(10));
         InventoryTask task = actual.get(9);
-        assertThat(((IntegerPrimaryKeyIngestPosition) 
task.getTaskProgress().getPosition()).getBeginValue(), 
is(BigInteger.valueOf(91L)));
-        assertThat(((IntegerPrimaryKeyIngestPosition) 
task.getTaskProgress().getPosition()).getEndValue(), 
is(BigInteger.valueOf(100L)));
+        assertThat(((UniqueKeyIngestPosition<?>) 
task.getTaskProgress().getPosition()).getLowerBound(), 
is(BigInteger.valueOf(91L)));
+        assertThat(((UniqueKeyIngestPosition<?>) 
task.getTaskProgress().getPosition()).getUpperBound(), 
is(BigInteger.valueOf(100L)));
     }
     
     @Test
@@ -110,9 +110,9 @@ class InventoryTaskSplitterTest {
         List<InventoryTask> actual = 
inventoryTaskSplitter.split(jobItemContext);
         assertThat(actual.size(), is(1));
         assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0"));
-        IntegerPrimaryKeyIngestPosition keyPosition = 
(IntegerPrimaryKeyIngestPosition) actual.get(0).getTaskProgress().getPosition();
-        assertThat(keyPosition.getBeginValue(), is(BigInteger.ONE));
-        assertThat(keyPosition.getEndValue(), is(BigInteger.valueOf(999L)));
+        UniqueKeyIngestPosition<?> keyPosition = (UniqueKeyIngestPosition<?>) 
actual.get(0).getTaskProgress().getPosition();
+        assertThat(keyPosition.getLowerBound(), is(BigInteger.ONE));
+        assertThat(keyPosition.getUpperBound(), is(BigInteger.valueOf(999L)));
     }
     
     @Test
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 030aec608cc..ea9e539aa8a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -23,8 +23,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.Range;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -79,7 +80,7 @@ class InventoryTaskTest {
         InventoryTask inventoryTask = new 
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperContext),
                 PipelineContextUtils.getExecuteEngine(), 
PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), 
mock(Importer.class), position);
         CompletableFuture.allOf(inventoryTask.start().toArray(new 
CompletableFuture[0])).get(10L, TimeUnit.SECONDS);
-        assertThat(inventoryTask.getTaskProgress().getPosition(), 
isA(IntegerPrimaryKeyIngestPosition.class));
+        assertThat(inventoryTask.getTaskProgress().getPosition(), 
isA(UniqueKeyIngestPosition.class));
     }
     
     private void initTableData(final IncrementalDumperContext dumperContext) 
throws SQLException {
@@ -101,7 +102,7 @@ class InventoryTaskTest {
         result.setActualTableName(actualTableName);
         
result.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData()));
         result.getCommonContext().setPosition(null == 
taskConfig.getDumperContext().getCommonContext().getPosition()
-                ? new IntegerPrimaryKeyIngestPosition(BigInteger.ONE, 
BigInteger.valueOf(1000L))
+                ? 
UniqueKeyIngestPosition.ofInteger(Range.closed(BigInteger.ONE, 
BigInteger.valueOf(1000L)))
                 : 
taskConfig.getDumperContext().getCommonContext().getPosition());
         return result;
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index 6e2d2a35607..dd50df7332c 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePosition;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePositionSwapper;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.UniqueKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
@@ -49,8 +49,8 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.mockito.Mockito.mock;
 
@@ -101,8 +101,8 @@ class ConsistencyCheckJobExecutorCallbackTest {
         result.setSplittingItem(0);
         result.setSourceDataNode("ds_0.t_order");
         result.setLogicTableName("t_order");
-        result.setSourceRange(new UnsupportedKeyIngestPosition().toString());
-        result.setTargetRange(new UnsupportedKeyIngestPosition().toString());
+        result.setSourceRange(UniqueKeyIngestPosition.ofUnsplit().encode());
+        result.setTargetRange(UniqueKeyIngestPosition.ofUnsplit().encode());
         result.setSourcePosition(100);
         result.setTargetPosition(100);
         return result;

Reply via email to