This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0a2a267161d Extract InventoryPositionExactCalculator (#37508)
0a2a267161d is described below
commit 0a2a267161d35f1d636f37dae0a5604b71c531c6
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Dec 25 15:06:11 2025 +0800
Extract InventoryPositionExactCalculator (#37508)
* Add DialectPipelineSQLBuilder.buildSplitByUniqueKeyRangedSubqueryClause
method and impls
* Add InventoryPositionExactCalculator and DataTypePositionHandler
---
.../position/exact/DataTypePositionHandler.java | 61 ++++++++++
.../exact/InventoryPositionExactCalculator.java | 127 +++++++++++++++++++++
.../dialect/DialectPipelineSQLBuilder.java | 10 ++
.../sqlbuilder/sql/PipelinePrepareSQLBuilder.java | 15 +++
.../fixture/FixturePipelineSQLBuilder.java | 5 +
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 7 ++
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 8 ++
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 7 ++
.../OpenGaussPipelineSQLBuilderTest.java | 8 ++
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 7 ++
.../PostgreSQLPipelineSQLBuilderTest.java | 8 ++
.../h2/sqlbuilder/H2PipelineSQLBuilder.java | 7 ++
12 files changed, 270 insertions(+)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
new file mode 100644
index 00000000000..3cabd8ac15c
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/DataTypePositionHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Data type position handler.
+ *
+ * @param <T> data type
+ */
+public interface DataTypePositionHandler<T> {
+
+ /**
+ * Read column value from result set.
+ *
+ * @param resultSet result set
+ * @param columnIndex column index
+ * @return column value
+ * @throws SQLException SQL exception
+ */
+ T readColumnValue(ResultSet resultSet, int columnIndex) throws
SQLException;
+
+ /**
+ * Set prepared statement value.
+ *
+ * @param preparedStatement prepared statement
+ * @param parameterIndex parameter index
+ * @param value value
+ * @throws SQLException SQL exception
+ */
+ void setPreparedStatementValue(PreparedStatement preparedStatement, int
parameterIndex, T value) throws SQLException;
+
+ /**
+ * Create ingest position.
+ *
+ * @param lowerValue lower value
+ * @param upperValue upper value
+ * @return ingest position
+ */
+ PrimaryKeyIngestPosition<T> createIngestPosition(T lowerValue, T
upperValue);
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
new file mode 100644
index 00000000000..b392abd73e1
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/calculator/position/exact/InventoryPositionExactCalculator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculator.position.exact;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Inventory position exact calculator.
+ */
+@NoArgsConstructor(access = AccessLevel.NONE)
+@Slf4j
+public final class InventoryPositionExactCalculator {
+
+ /**
+ * Get positions by table unique key range.
+ *
+ * @param <T> type of unique key
+ * @param qualifiedTable qualified table
+ * @param uniqueKey unique key
+ * @param shardingSize sharding size
+ * @param sourceDataSource source data source
+ * @param positionHandler position handler
+ * @return positions
+ * @throws SplitPipelineJobByUniqueKeyException if an error occurs while
splitting table by unique key
+ */
+ public static <T> List<IngestPosition> getPositions(final QualifiedTable
qualifiedTable, final String uniqueKey, final int shardingSize,
+ final
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T>
positionHandler) {
+ List<IngestPosition> result = new LinkedList<>();
+ PrimaryKeyIngestPosition<T> firstPosition =
getFirstPosition(qualifiedTable, uniqueKey, shardingSize, sourceDataSource,
positionHandler);
+ result.add(firstPosition);
+ result.addAll(getLeftPositions(qualifiedTable, uniqueKey,
shardingSize, firstPosition, sourceDataSource, positionHandler));
+ return result;
+ }
+
+ private static <T> PrimaryKeyIngestPosition<T> getFirstPosition(final
QualifiedTable qualifiedTable, final String uniqueKey, final int shardingSize,
+ final
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T>
positionHandler) {
+ String firstQuerySQL = new
PipelinePrepareSQLBuilder(sourceDataSource.getDatabaseType())
+
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName(), uniqueKey, false);
+ try (
+ Connection connection = sourceDataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(firstQuerySQL)) {
+ preparedStatement.setLong(1, shardingSize);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ log.info("No any record, return. First query SQL: {}",
firstQuerySQL);
+ return positionHandler.createIngestPosition(null, null);
+ }
+ long count = resultSet.getLong(2);
+ log.info("First records count is 0, return. First query SQL:
{}", firstQuerySQL);
+ if (0 == count) {
+ return positionHandler.createIngestPosition(null, null);
+ }
+ T minValue = positionHandler.readColumnValue(resultSet, 3);
+ T maxValue = positionHandler.readColumnValue(resultSet, 1);
+ return positionHandler.createIngestPosition(minValue,
maxValue);
+ }
+ } catch (final SQLException ex) {
+ throw new
SplitPipelineJobByUniqueKeyException(qualifiedTable.getTableName(), uniqueKey,
ex);
+ }
+ }
+
+ private static <T> List<IngestPosition> getLeftPositions(final
QualifiedTable qualifiedTable, final String uniqueKey,
+ final int
shardingSize, final PrimaryKeyIngestPosition<T> firstPosition,
+ final
PipelineDataSource sourceDataSource, final DataTypePositionHandler<T>
positionHandler) {
+ List<IngestPosition> result = new LinkedList<>();
+ T lowerValue = firstPosition.getEndValue();
+ long recordsCount = 0;
+ String laterQuerySQL = new
PipelinePrepareSQLBuilder(sourceDataSource.getDatabaseType())
+
.buildSplitByUniqueKeyRangedSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName(), uniqueKey, true);
+ try (
+ Connection connection = sourceDataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(laterQuerySQL)) {
+ for (int i = 0; i < Integer.MAX_VALUE; i++) {
+ positionHandler.setPreparedStatementValue(preparedStatement,
1, lowerValue);
+ preparedStatement.setLong(2, shardingSize);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ if (!resultSet.next()) {
+ break;
+ }
+ T maxValue = positionHandler.readColumnValue(resultSet, 1);
+ int count = resultSet.getInt(2);
+ if (0 == count) {
+ log.info("Done. Later records count: {}, later query
SQL: {}, last lower value: {}", recordsCount, laterQuerySQL, lowerValue);
+ break;
+ }
+ recordsCount += count;
+ T minValue = positionHandler.readColumnValue(resultSet, 3);
+ result.add(positionHandler.createIngestPosition(minValue,
maxValue));
+ lowerValue = maxValue;
+ }
+ }
+ } catch (final SQLException ex) {
+ throw new
SplitPipelineJobByUniqueKeyException(qualifiedTable.getTableName(), uniqueKey,
ex);
+ }
+ return result;
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
index 45211650cfe..d1f843cba5d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
@@ -82,6 +82,16 @@ public interface DialectPipelineSQLBuilder extends
DatabaseTypedSPI {
return Optional.empty();
}
+ /**
+ * Build split by unique key subquery clause.
+ *
+ * @param qualifiedTableName qualified table name
+ * @param uniqueKey unique key
+ * @param hasLowerValue has lower value
+ * @return built SQL
+ */
+ String buildSplitByUniqueKeyRangedSubqueryClause(String
qualifiedTableName, String uniqueKey, boolean hasLowerValue);
+
/**
* Build create table SQLs.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
index e3185e21f37..cdf64123999 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelinePrepareSQLBuilder.java
@@ -105,4 +105,19 @@ public final class PipelinePrepareSQLBuilder {
public String buildCheckEmptyTableSQL(final String schemaName, final
String tableName) {
return
dialectSQLBuilder.buildCheckEmptyTableSQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName));
}
+
+ /**
+ * Build split by unique key ranged SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @param uniqueKey unique key
+ * @param hasLowerValue has lower value
+ * @return split SQL
+ */
+ public String buildSplitByUniqueKeyRangedSQL(final String schemaName,
final String tableName, final String uniqueKey, final boolean hasLowerValue) {
+ String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
+ String subQueryClause =
dialectSQLBuilder.buildSplitByUniqueKeyRangedSubqueryClause(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName), escapedUniqueKey, hasLowerValue);
+ return String.format("SELECT MAX(%s), COUNT(1), MIN(%s) FROM (%s) t",
escapedUniqueKey, escapedUniqueKey, subQueryClause);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
index 5dcc1929350..3f7304d1a5f 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
@@ -36,6 +36,11 @@ public final class FixturePipelineSQLBuilder implements
DialectPipelineSQLBuilde
return Optional.of(String.format("SELECT CRC32(%s) FROM %s",
columnName, qualifiedTableName));
}
+ @Override
+ public String buildSplitByUniqueKeyRangedSubqueryClause(final String
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+ return "";
+ }
+
@Override
public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) {
return Collections.emptyList();
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 817ff299c58..c2d59d3cf30 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -64,6 +64,13 @@ public final class MySQLPipelineSQLBuilder implements
DialectPipelineSQLBuilder
return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", columnName,
qualifiedTableName));
}
+ @Override
+ public String buildSplitByUniqueKeyRangedSubqueryClause(final String
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+ return hasLowerValue
+ ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
+ : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?",
uniqueKey, qualifiedTableName, uniqueKey);
+ }
+
@Override
public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) throws
SQLException {
try (
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index c976bb647b5..85000b830c0 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -83,6 +83,14 @@ class MySQLPipelineSQLBuilderTest {
assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM foo_tbl"));
}
+ @Test
+ void assertBuildSplitByUniqueKeyRangedSubqueryClause() {
+
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl",
"id", true),
+ is("SELECT id FROM foo_tbl WHERE id>? ORDER BY id LIMIT ?"));
+
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl",
"id", false),
+ is("SELECT id FROM foo_tbl ORDER BY id LIMIT ?"));
+ }
+
@Test
void assertBuildCreateTableSQLs() throws SQLException {
Connection connection = mock(Connection.class, RETURNS_DEEP_STUBS);
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 11f31994fa2..ddbec207cc8 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -62,6 +62,13 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
return Optional.of(String.format("SELECT reltuples::integer FROM
pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
}
+ @Override
+ public String buildSplitByUniqueKeyRangedSubqueryClause(final String
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+ return hasLowerValue
+ ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
+ : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?",
uniqueKey, qualifiedTableName, uniqueKey);
+ }
+
@Override
public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) throws
SQLException {
try (
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
index 9db016c9eda..a9e933ed704 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilderTest.java
@@ -81,6 +81,14 @@ class OpenGaussPipelineSQLBuilderTest {
assertThat(actual.get(), is("SELECT reltuples::integer FROM pg_class
WHERE oid='foo_tbl'::regclass::oid;"));
}
+ @Test
+ void assertBuildSplitByUniqueKeyRangedSubqueryClause() {
+
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl",
"id", true),
+ is("SELECT id FROM foo_tbl WHERE id>? ORDER BY id LIMIT ?"));
+
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl",
"id", false),
+ is("SELECT id FROM foo_tbl ORDER BY id LIMIT ?"));
+ }
+
@SuppressWarnings("JDBCResourceOpenedButNotSafelyClosed")
@Test
void assertBuildCreateTableSQLs() throws SQLException {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 800cdf6b3e2..f666842eb89 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -78,6 +78,13 @@ public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBui
return Optional.of(String.format("SELECT
pg_catalog.pg_checksum_table('%s', true)", qualifiedTableName));
}
+ @Override
+ public String buildSplitByUniqueKeyRangedSubqueryClause(final String
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+ return hasLowerValue
+ ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
+ : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?",
uniqueKey, qualifiedTableName, uniqueKey);
+ }
+
@Override
public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) throws
SQLException {
try (Connection connection = dataSource.getConnection()) {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
index cf58094bfce..4c2cd5b9e86 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilderTest.java
@@ -113,6 +113,14 @@ class PostgreSQLPipelineSQLBuilderTest {
assertThat(actual.get(), is("SELECT
pg_catalog.pg_checksum_table('foo_tbl', true)"));
}
+ @Test
+ void assertBuildSplitByUniqueKeyRangedSubqueryClause() {
+
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl",
"id", true),
+ is("SELECT id FROM foo_tbl WHERE id>? ORDER BY id LIMIT ?"));
+
assertThat(sqlBuilder.buildSplitByUniqueKeyRangedSubqueryClause("foo_tbl",
"id", false),
+ is("SELECT id FROM foo_tbl ORDER BY id LIMIT ?"));
+ }
+
@Test
void assertBuildQueryCurrentPositionSQL() {
Optional<String> actual = sqlBuilder.buildQueryCurrentPositionSQL();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
index e7a37ae5aef..31cd5f1fb39 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
@@ -36,6 +36,13 @@ public final class H2PipelineSQLBuilder implements
DialectPipelineSQLBuilder {
return String.format("SELECT * FROM %s LIMIT 1", qualifiedTableName);
}
+ @Override
+ public String buildSplitByUniqueKeyRangedSubqueryClause(final String
qualifiedTableName, final String uniqueKey, final boolean hasLowerValue) {
+ return hasLowerValue
+ ? String.format("SELECT %s FROM %s WHERE %s>? ORDER BY %s
LIMIT ?", uniqueKey, qualifiedTableName, uniqueKey, uniqueKey)
+ : String.format("SELECT %s FROM %s ORDER BY %s LIMIT ?",
uniqueKey, qualifiedTableName, uniqueKey);
+ }
+
@Override
public Collection<String> buildCreateTableSQLs(final DataSource
dataSource, final String schemaName, final String tableName) {
ShardingSpherePreconditions.checkState("t_order".equalsIgnoreCase(tableName),
() -> new CreateTableSQLGenerateException(tableName));