This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f1b654e4ee3 Refactor JDBCStreamQueryBuilder (#27393)
f1b654e4ee3 is described below
commit f1b654e4ee319fe6d87b005d1b0efd188dcac1f9
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Jul 24 01:03:18 2023 +0800
Refactor JDBCStreamQueryBuilder (#27393)
* Refactor DatabaseTypeEngine
* Refactor JDBCStreamQueryBuilder
---
.../infra/database/DatabaseTypeEngine.java | 10 +--
.../infra/database/spi/DatabaseType.java | 4 +-
.../query/DialectJDBCStreamQueryBuilder.java | 44 ++++++++++++
.../common/query/JDBCStreamQueryBuilder.java | 56 +++++++++++++++
.../query/dialect/H2JDBCStreamQueryBuilder.java | 42 +++++++++++
.../query/dialect/MySQLJDBCStreamQueryBuilder.java | 44 ++++++++++++
.../dialect/OpenGaussJDBCStreamQueryBuilder.java | 44 ++++++++++++
.../dialect/PostgreSQLJDBCStreamQueryBuilder.java | 44 ++++++++++++
.../pipeline/common/util/JDBCStreamQueryUtils.java | 81 ----------------------
...DataMatchDataConsistencyCalculateAlgorithm.java | 6 +-
.../data/pipeline/core/dumper/InventoryDumper.java | 6 +-
...line.common.query.DialectJDBCStreamQueryBuilder | 21 ++++++
12 files changed, 308 insertions(+), 94 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/DatabaseTypeEngine.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/DatabaseTypeEngine.java
index 67dfaab89da..04d5780c508 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/database/DatabaseTypeEngine.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/database/DatabaseTypeEngine.java
@@ -71,6 +71,11 @@ public final class DatabaseTypeEngine {
return configuredDatabaseType.orElseGet(() ->
getStorageType(getEnabledDataSources(databaseConfigs).values()));
}
+ private static Optional<DatabaseType> findConfiguredDatabaseType(final
ConfigurationProperties props) {
+ DatabaseType configuredDatabaseType =
props.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE);
+ return null == configuredDatabaseType ? Optional.empty() :
Optional.of(configuredDatabaseType.getTrunkDatabaseType().orElse(configuredDatabaseType));
+ }
+
private static Map<String, DataSource> getEnabledDataSources(final
Map<String, ? extends DatabaseConfiguration> databaseConfigs) {
Map<String, DataSource> result = new LinkedHashMap<>();
for (Entry<String, ? extends DatabaseConfiguration> entry :
databaseConfigs.entrySet()) {
@@ -127,11 +132,6 @@ public final class DatabaseTypeEngine {
}
}
- private static Optional<DatabaseType> findConfiguredDatabaseType(final
ConfigurationProperties props) {
- DatabaseType configuredDatabaseType =
props.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE);
- return null == configuredDatabaseType ? Optional.empty() :
Optional.of(configuredDatabaseType.getTrunkDatabaseType().orElse(configuredDatabaseType));
- }
-
/**
* Get default schema name.
*
diff --git
a/infra/database/spi/src/main/java/org/apache/shardingsphere/infra/database/spi/DatabaseType.java
b/infra/database/spi/src/main/java/org/apache/shardingsphere/infra/database/spi/DatabaseType.java
index a75f55c932b..1ed6e80f8d1 100644
---
a/infra/database/spi/src/main/java/org/apache/shardingsphere/infra/database/spi/DatabaseType.java
+++
b/infra/database/spi/src/main/java/org/apache/shardingsphere/infra/database/spi/DatabaseType.java
@@ -59,9 +59,9 @@ public interface DatabaseType extends TypedSPI {
}
/**
- * Get alias of JDBC URL prefixes.
+ * Get JDBC URL prefixes.
*
- * @return Alias of JDBC URL prefixes
+ * @return prefixes of JDBC URL
*/
Collection<String> getJdbcUrlPrefixes();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/DialectJDBCStreamQueryBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/DialectJDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..b0ef748d73d
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/DialectJDBCStreamQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query;
+
+import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Dialect JDBC stream query builder.
+ */
+@SingletonSPI
+public interface DialectJDBCStreamQueryBuilder extends DatabaseTypedSPI {
+
+ /**
+ * Build streamed prepared statement.
+ *
+ * @param connection connection
+ * @param databaseType database type
+ * @param sql SQL to be queried
+ * @return built prepared statement
+ * @throws SQLException SQL exception
+ */
+ PreparedStatement build(DatabaseType databaseType, Connection connection,
String sql) throws SQLException;
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/JDBCStreamQueryBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/JDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..c3c29fee756
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/JDBCStreamQueryBuilder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+/**
+ * JDBC stream query builder.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
+public final class JDBCStreamQueryBuilder {
+
+ /**
+ * Build streamed prepared statement.
+ *
+ * @param connection connection
+ * @param databaseType database type
+ * @param sql SQL to be queried
+ * @return built prepared statement
+ * @throws SQLException SQL exception
+ */
+ public static PreparedStatement build(final DatabaseType databaseType,
final Connection connection, final String sql) throws SQLException {
+ Optional<DialectJDBCStreamQueryBuilder> dialectBuilder =
DatabaseTypedSPILoader.findService(DialectJDBCStreamQueryBuilder.class,
databaseType);
+ if (dialectBuilder.isPresent()) {
+ return dialectBuilder.get().build(databaseType, connection, sql);
+ }
+ log.warn("not support {} streaming query now, pay attention to memory
usage", databaseType.getType());
+ return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/H2JDBCStreamQueryBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/H2JDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..7744207c709
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/H2JDBCStreamQueryBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query.dialect;
+
+import
org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * JDBC stream query builder for H2.
+ */
+public final class H2JDBCStreamQueryBuilder implements
DialectJDBCStreamQueryBuilder {
+
+ @Override
+ public PreparedStatement build(final DatabaseType databaseType, final
Connection connection, final String sql) throws SQLException {
+ return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "H2";
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/MySQLJDBCStreamQueryBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/MySQLJDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..76b9e605255
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/MySQLJDBCStreamQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query.dialect;
+
+import
org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * JDBC stream query builder for MySQL.
+ */
+public final class MySQLJDBCStreamQueryBuilder implements
DialectJDBCStreamQueryBuilder {
+
+ @Override
+ public PreparedStatement build(final DatabaseType databaseType, final
Connection connection, final String sql) throws SQLException {
+ PreparedStatement result = connection.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ result.setFetchSize(Integer.MIN_VALUE);
+ return result;
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "MySQL";
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/OpenGaussJDBCStreamQueryBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/OpenGaussJDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..6ef46996c49
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/OpenGaussJDBCStreamQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query.dialect;
+
+import
org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * JDBC stream query builder for openGauss.
+ */
+public final class OpenGaussJDBCStreamQueryBuilder implements
DialectJDBCStreamQueryBuilder {
+
+ @Override
+ public PreparedStatement build(final DatabaseType databaseType, final
Connection connection, final String sql) throws SQLException {
+ PreparedStatement result = connection.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
ResultSet.CLOSE_CURSORS_AT_COMMIT);
+ connection.setAutoCommit(false);
+ return result;
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "openGauss";
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/PostgreSQLJDBCStreamQueryBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/PostgreSQLJDBCStreamQueryBuilder.java
new file mode 100644
index 00000000000..d66898f3ce3
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/query/dialect/PostgreSQLJDBCStreamQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.query.dialect;
+
+import
org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * JDBC stream query builder for PostgreSQL.
+ */
+public final class PostgreSQLJDBCStreamQueryBuilder implements
DialectJDBCStreamQueryBuilder {
+
+ @Override
+ public PreparedStatement build(final DatabaseType databaseType, final
Connection connection, final String sql) throws SQLException {
+ PreparedStatement result = connection.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
ResultSet.CLOSE_CURSORS_AT_COMMIT);
+ connection.setAutoCommit(false);
+ return result;
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "PostgreSQL";
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/JDBCStreamQueryUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/JDBCStreamQueryUtils.java
deleted file mode 100644
index 71b077acd66..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/JDBCStreamQueryUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.util;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.database.h2.H2DatabaseType;
-import org.apache.shardingsphere.infra.database.mysql.MySQLDatabaseType;
-import org.apache.shardingsphere.infra.database.spi.DatabaseType;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-/**
- * JDBC stream query utility class.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-@Slf4j
-public final class JDBCStreamQueryUtils {
-
- /**
- * Generate stream query prepared statement.
- *
- * @param connection connection
- * @param databaseType database type
- * @param sql sql
- * @return stream query prepared statement
- * @throws SQLException SQL exception
- */
- public static PreparedStatement generateStreamQueryPreparedStatement(final
DatabaseType databaseType, final Connection connection, final String sql)
throws SQLException {
- if (databaseType instanceof MySQLDatabaseType) {
- return generateForMySQL(connection, sql);
- }
- if (databaseType.getDefaultSchema().isPresent()) {
- return generateForPostgreSQL(connection, sql);
- }
- if (databaseType instanceof H2DatabaseType) {
- return generateByDefault(connection, sql);
- }
- if (databaseType.getTrunkDatabaseType().isPresent()) {
- return
generateStreamQueryPreparedStatement(databaseType.getTrunkDatabaseType().get(),
connection, sql);
- }
- log.warn("not support {} streaming query now, pay attention to memory
usage", databaseType.getType());
- return generateByDefault(connection, sql);
- }
-
- // TODO Consider use SPI
- private static PreparedStatement generateForMySQL(final Connection
connection, final String sql) throws SQLException {
- PreparedStatement result = connection.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- result.setFetchSize(Integer.MIN_VALUE);
- return result;
- }
-
- private static PreparedStatement generateForPostgreSQL(final Connection
connection, final String sql) throws SQLException {
- PreparedStatement result = connection.prepareStatement(sql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
ResultSet.CLOSE_CURSORS_AT_COMMIT);
- connection.setAutoCommit(false);
- return result;
- }
-
- private static PreparedStatement generateByDefault(final Connection
connection, final String sql) throws SQLException {
- return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 883a2e2740b..3ee3a6480be 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -20,16 +20,16 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.common.query.JDBCStreamQueryBuilder;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineDataConsistencyCalculateSQLBuilder;
-import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataConsistencyCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.DataMatchCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.dumper.ColumnValueReaderEngine;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import org.apache.shardingsphere.infra.database.spi.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
@@ -143,7 +143,7 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
private void fulfillCalculationContext(final CalculationContext
calculationContext, final DataConsistencyCalculateParameter param) throws
SQLException {
String sql = getQuerySQL(param);
- PreparedStatement preparedStatement =
JDBCStreamQueryUtils.generateStreamQueryPreparedStatement(param.getDatabaseType(),
calculationContext.getConnection(), sql);
+ PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(param.getDatabaseType(),
calculationContext.getConnection(), sql);
setCurrentStatement(preparedStatement);
if (!(param.getDatabaseType() instanceof MySQLDatabaseType)) {
preparedStatement.setFetchSize(chunkSize);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 47e4c2ce838..11bdb10d8af 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -40,14 +40,14 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPo
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.PrimaryKeyPositionFactory;
+import
org.apache.shardingsphere.data.pipeline.common.query.JDBCStreamQueryBuilder;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineInventoryDumpSQLBuilder;
-import
org.apache.shardingsphere.data.pipeline.common.util.JDBCStreamQueryUtils;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.spi.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.spi.DatabaseType;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import javax.sql.DataSource;
@@ -117,7 +117,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
if (null != dumperConfig.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperConfig.getTransactionIsolation());
}
- try (PreparedStatement preparedStatement =
JDBCStreamQueryUtils.generateStreamQueryPreparedStatement(databaseType,
connection, buildInventoryDumpSQL())) {
+ try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildInventoryDumpSQL())) {
dumpStatement.set(preparedStatement);
if (!(databaseType instanceof MySQLDatabaseType)) {
preparedStatement.setFetchSize(batchSize);
diff --git
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder
new file mode 100644
index 00000000000..840475b7da2
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.query.DialectJDBCStreamQueryBuilder
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.common.query.dialect.MySQLJDBCStreamQueryBuilder
+org.apache.shardingsphere.data.pipeline.common.query.dialect.PostgreSQLJDBCStreamQueryBuilder
+org.apache.shardingsphere.data.pipeline.common.query.dialect.OpenGaussJDBCStreamQueryBuilder
+org.apache.shardingsphere.data.pipeline.common.query.dialect.H2JDBCStreamQueryBuilder