This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a2a267161d Extract InventoryPositionExactCalculator (#37508)
0a2a267161d is described below

commit 0a2a267161d35f1d636f37dae0a5604b71c531c6
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 25 15:06:11 2025 +0800

    Extract InventoryPositionExactCalculator (#37508)
    
    * Add DialectPipelineSQLBuilder.buildSplitByUniqueKeyRangedSubqueryClause 
method and impls
    
    * Add InventoryPositionExactCalculator and DataTypePositionHandler
---
 .../position/exact/DataTypePositionHandler.java    |  61 ++++++++++
 .../exact/InventoryPositionExactCalculator.java    | 127 +++++++++++++++++++++
 .../dialect/DialectPipelineSQLBuilder.java         |  10 ++
 .../sqlbuilder/sql/PipelinePrepareSQLBuilder.java  |  15 +++
 .../fixture/FixturePipelineSQLBuilder.java         |   5 +
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |   7 ++
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    |   8 ++
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    |   7 ++
 .../OpenGaussPipelineSQLBuilderTest.java           |   8 ++
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   |   7 ++
 .../PostgreSQLPipelineSQLBuilderTest.java          |   8 ++
 .../h2/sqlbuilder/H2PipelineSQLBuilder.java        |   7 ++
 12 files changed, 270 insertions(+)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
new file mode 100644
index 00000000000..3cabd8ac15c
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Data type position handler.
+ *
+ * @param <T> data type
+ */
+public interface DataTypePositionHandler<T> {
+    
+    /**
+     * Read column value from result set.
+     *
+     * @param resultSet result set
+     * @param columnIndex column index
+     * @return column value
+     * @throws SQLException SQL exception
+     */
+    T readColumnValue(ResultSet resultSet, int columnIndex) throws 
SQLException;
+    
+    /**
+     * Set prepared statement value.
+     *
+     * @param preparedStatement prepared statement
+     * @param parameterIndex parameter index
+     * @param value value
+     * @throws SQLException SQL exception
+     */
+    void setPreparedStatementValue(PreparedStatement preparedStatement, int 
parameterIndex, T value) throws SQLException;
+    
+    /**
+     * Create ingest position.
+     *
+     * @param lowerValue lower value
+     * @param upperValue upper value
+     * @return ingest position
+     */
+    PrimaryKeyIngestPosition<T> createIngestPosition(T lowerValue, T 
upperValue);
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
new file mode 100644
index 00000000000..b392abd73e1
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Inventory position exact calculator.
+ */
+@NoArgsConstructor(access = AccessLevel.NONE)
+@Slf4j
+public final class InventoryPositionExactCalculator {
+    
+    /**
+     * Get positions by table unique key range.
+     *
+     * @param <T> type of unique key
+     * @param qualifiedTable qualified table
+     * @param uniqueKey unique key
+     * @param shardingSize sharding size
+     * @param sourceDataSource source data source
+     * @param positionHandler position handler
+     * @return positions
+     * @throws SplitPipelineJobByUniqueKeyException if an error occurs while 
splitting table by unique key
+     */
+    public static <T> List<IngestPosition> getPositions(final QualifiedTable 
qualifiedTable, final String uniqueKey, final int shardingSize,
+                                                        final 
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T> 
positionHandler) {
+        List<IngestPosition> result = new LinkedList<>();
+        PrimaryKeyIngestPosition<T> firstPosition = 
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, sourceDataSource, 
positionHandler);
+        result.add(firstPosition);
+        result.addAll(getLeftPositions(qualifiedTable, uniqueKey, 
shardingSize, firstPosition, sourceDataSource, positionHandler));
+        return result;
+    }
+    
+    private static <T> PrimaryKeyIngestPosition<T> getFirstPosition(final 
QualifiedTable qualifiedTable, final String uniqueKey, final int shardingSize,
+                                                                    final 
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T> 
positionHandler) {
+        String firstQuerySQL = new 
PipelinePrepareSQLBuilder(sourceDataSource.getDatabaseType())
+                
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName(), uniqueKey, false);
+        try (
+                Connection connection = sourceDataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(firstQuerySQL)) {
+            preparedStatement.setLong(1, shardingSize);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                if (!resultSet.next()) {
+                    log.info("No any record, return. First query SQL: {}", 
firstQuerySQL);
+                    return positionHandler.createIngestPosition(null, null);
+                }
+                long count = resultSet.getLong(2);
+                log.info("First records count is 0, return. First query SQL: 
{}", firstQuerySQL);
+                if (0 == count) {
+                    return positionHandler.createIngestPosition(null, null);
+                }
+                T minValue = positionHandler.readColumnValue(resultSet, 3);
+                T maxValue = positionHandler.readColumnValue(resultSet, 1);
+                return positionHandler.createIngestPosition(minValue, 
maxValue);
+            }
+        } catch (final SQLException ex) {
+            throw new 
SplitPipelineJobByUniqueKeyException(qualifiedTable.getTableName(), uniqueKey, 
ex);
+        }
+    }
+    
+    private static <T> List<IngestPosition> getLeftPositions(final 
QualifiedTable qualifiedTable, final String uniqueKey,
+                                                             final int 
shardingSize, final PrimaryKeyIngestPosition<T> firstPosition,
+                                                             final 
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T> 
positionHandler) {
+        List<IngestPosition> result = new LinkedList<>();
+        T lowerValue = firstPosition.getEndValue();
+        long recordsCount = 0;
+        String laterQuerySQL = new 
PipelinePrepareSQLBuilder(sourceDataSource.getDatabaseType())
+                
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName(), uniqueKey, true);
+        try (
+                Connection connection = sourceDataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(laterQuerySQL)) {
+            for (int i = 0; i < Integer.MAX_VALUE; i++) {
+                positionHandler.setPreparedStatementValue(preparedStatement, 
1, lowerValue);
+                preparedStatement.setLong(2, shardingSize);
+                try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                    if (!resultSet.next()) {
+                        break;
+                    }
+                    T maxValue = positionHandler.readColumnValue(resultSet, 1);
+                    int count = resultSet.getInt(2);
+                    if (0 == count) {
+                        log.info("Done. Later records count: {}, later query 
SQL: {}, last lower value: {}", recordsCount, laterQuerySQL, lowerValue);
+                        break;
+                    }
+                    recordsCount += count;
+                    T minValue = positionHandler.readColumnValue(resultSet, 3);
+                    result.add(positionHandler.createIngestPosition(minValue, 
maxValue));
+                    lowerValue = maxValue;
+                }
+            }
+        } catch (final SQLException ex) {
+            throw new 
SplitPipelineJobByUniqueKeyException(qualifiedTable.getTableName(), uniqueKey, 
ex);
+        }
+        return result;
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
index 45211650cfe..d1f843cba5d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
@@ -82,6 +82,16 @@ public interface DialectPipelineSQLBuilder extends 
DatabaseTypedSPI {
         return Optional.empty();
     }
     
+    /**
+     * Build split by unique key subquery clause.
+     *
+     * @param qualifiedTableName qualified table name
+     * @param uniqueKey unique key
+     * @param hasLowerValue has lower value
+     * @return built SQL
+     */
+    String buildSplitByUniqueKeyRangedSubqueryClause(String 
qualifiedTableName, String uniqueKey, boolean hasLowerValue);
+    
     /**
      * Build create table SQLs.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
index e3185e21f37..cdf64123999 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
@@ -105,4 +105,19 @@ public final class PipelinePrepareSQLBuilder {
     public String buildCheckEmptyTableSQL(final String schemaName, final 
String tableName) {
         return 
dialectSQLBuilder.buildCheckEmptyTableSQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
 tableName));
     }
+    
+    /**
+     * Build split by unique key ranged SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @param uniqueKey unique key
+     * @param hasLowerValue has lower value
+     * @return split SQL
+     */
+    public String buildSplitByUniqueKeyRangedSQL(final String schemaName, 
final String tableName, final String uniqueKey, final boolean hasLowerValue) {
+        String escapedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
+        String subQueryClause = 
dialectSQLBuilder.buildSplitByUniqueKeyRangedSubqueryClause(sqlSegmentBuilder.getQualifiedTableName(schemaName,
 tableName), escapedUniqueKey, hasLowerValue);
+        return String.format("SELECT MAX(%s), COUNT(1), MIN(%s) FROM (%s) t", 
escapedUniqueKey, escapedUniqueKey, subQueryClause);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
index 5dcc1929350..3f7304d1a5f 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
@@ -36,6 +36,11 @@ public final class FixturePipelineSQLBuilder implements 
DialectPipelineSQLBuilde
         return Optional.of(String.format("SELECT CRC32(%s) FROM %s", 
columnName, qualifiedTableName));
     }
     
+    @Override
+    public String buildSplitByUniqueKeyRangedSubqueryClause(final String 
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+        return "";
+    }
+    
     @Override
     public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) {
         return Collections.emptyList();
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 817ff299c58..c2d59d3cf30 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -64,6 +64,13 @@ public final class MySQLPipelineSQLBuilder implements 
DialectPipelineSQLBuilder
         return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", columnName, 
qualifiedTableName));
     }
     
+    @Override
+    public String buildSplitByUniqueKeyRangedSubqueryClause(final String 
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+        return hasLowerValue
+                ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
+                : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?", 
uniqueKey, qualifiedTableName, uniqueKey);
+    }
+    
     @Override
     public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) throws 
SQLException {
         try (
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index c976bb647b5..85000b830c0 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -83,6 +83,14 @@ class MySQLPipelineSQLBuilderTest {
         assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM foo_tbl"));
     }
     
+    @Test
+    void assertBuildSplitByUniqueKeyRangedSubqueryClause() {
+        
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl", 
"id", true),
+                is("SELECT id FROM foo_tbl WHERE id>? ORDER BY id LIMIT ?"));
+        
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl", 
"id", false),
+                is("SELECT id FROM foo_tbl ORDER BY id LIMIT ?"));
+    }
+    
     @Test
     void assertBuildCreateTableSQLs() throws SQLException {
         Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 11f31994fa2..ddbec207cc8 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -62,6 +62,13 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
         return Optional.of(String.format("SELECT reltuples::integer FROM 
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
     }
     
+    @Override
+    public String buildSplitByUniqueKeyRangedSubqueryClause(final String 
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+        return hasLowerValue
+                ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
+                : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?", 
uniqueKey, qualifiedTableName, uniqueKey);
+    }
+    
     @Override
     public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) throws 
SQLException {
         try (
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index 9db016c9eda..a9e933ed704 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -81,6 +81,14 @@ class OpenGaussPipelineSQLBuilderTest {
         assertThat(actual.get(), is("SELECT reltuples::integer FROM pg_class 
WHERE oid='foo_tbl'::regclass::oid;"));
     }
     
+    @Test
+    void assertBuildSplitByUniqueKeyRangedSubqueryClause() {
+        
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl", 
"id", true),
+                is("SELECT id FROM foo_tbl WHERE id>? ORDER BY id LIMIT ?"));
+        
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl", 
"id", false),
+                is("SELECT id FROM foo_tbl ORDER BY id LIMIT ?"));
+    }
+    
     @SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
     @Test
     void assertBuildCreateTableSQLs() throws SQLException {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 800cdf6b3e2..f666842eb89 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -78,6 +78,13 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
         return Optional.of(String.format("SELECT 
pg_catalog.pg_checksum_table('%s', true)", qualifiedTableName));
     }
     
+    @Override
+    public String buildSplitByUniqueKeyRangedSubqueryClause(final String 
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+        return hasLowerValue
+                ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
+                : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?", 
uniqueKey, qualifiedTableName, uniqueKey);
+    }
+    
     @Override
     public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) throws 
SQLException {
         try (Connection connection = dataSource.getConnection()) {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index cf58094bfce..4c2cd5b9e86 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -113,6 +113,14 @@ class PostgreSQLPipelineSQLBuilderTest {
         assertThat(actual.get(), is("SELECT 
pg_catalog.pg_checksum_table('foo_tbl', true)"));
     }
     
+    @Test
+    void assertBuildSplitByUniqueKeyRangedSubqueryClause() {
+        
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl", 
"id", true),
+                is("SELECT id FROM foo_tbl WHERE id>? ORDER BY id LIMIT ?"));
+        
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl", 
"id", false),
+                is("SELECT id FROM foo_tbl ORDER BY id LIMIT ?"));
+    }
+    
     @Test
     void assertBuildQueryCurrentPositionSQL() {
         Optional<String> actual = sqlBuilder.buildQueryCurrentPositionSQL();
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
index e7a37ae5aef..31cd5f1fb39 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
@@ -36,6 +36,13 @@ public final class H2PipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
         return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
     }
     
+    @Override
+    public String buildSplitByUniqueKeyRangedSubqueryClause(final String 
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+        return hasLowerValue
+                ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s 
LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
+                : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?", 
uniqueKey, qualifiedTableName, uniqueKey);
+    }
+    
     @Override
     public Collection<String> buildCreateTableSQLs(final DataSource 
dataSource, final String schemaName, final String tableName) {
         
ShardingSpherePreconditions.checkState("t_order".equalsIgnoreCase(tableName), 
() -> new CreateTableSQLGenerateException(tableName));

Reply via email to