This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 895187377ef Decouple ShardingSpherePreparedStatement and
BatchPreparedStatementExecutor (#31586)
895187377ef is described below
commit 895187377ef7287073e3f2846179c3423491d74c
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Jun 4 23:27:45 2024 +0800
Decouple ShardingSpherePreparedStatement and BatchPreparedStatementExecutor
(#31586)
* Decouple ShardingSpherePreparedStatement and
BatchPreparedStatementExecutor
* Decouple ShardingSpherePreparedStatement and
BatchPreparedStatementExecutor
---
.../executor/DriverExecuteBatchExecutor.java | 42 +++++++++++++++++-----
.../driver/executor/DriverExecutorFacade.java | 19 ++++++++--
.../batch/BatchPreparedStatementExecutor.java | 1 -
.../PreparedStatementParametersReplayCallback.java | 37 +++++++++++++++++++
.../statement/ShardingSpherePreparedStatement.java | 24 +++----------
.../core/statement/ShardingSphereStatement.java | 2 +-
6 files changed, 92 insertions(+), 33 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteBatchExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteBatchExecutor.java
index ddfcad31bb2..09d2356ee70 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteBatchExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecuteBatchExecutor.java
@@ -17,12 +17,12 @@
package org.apache.shardingsphere.driver.executor;
-import lombok.RequiredArgsConstructor;
+import lombok.Getter;
import org.apache.shardingsphere.driver.executor.batch.BatchExecutionUnit;
import
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.executor.callback.keygen.GeneratedKeyCallback;
-import
org.apache.shardingsphere.driver.executor.callback.replay.StatementReplayCallback;
+import
org.apache.shardingsphere.driver.executor.callback.replay.PreparedStatementParametersReplayCallback;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
@@ -43,7 +44,9 @@ import
org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -53,15 +56,21 @@ import java.util.Optional;
/**
* Driver execute batch executor.
*/
-@RequiredArgsConstructor
public final class DriverExecuteBatchExecutor {
private final ShardingSphereConnection connection;
private final ShardingSphereMetaData metaData;
+ @Getter
private final BatchPreparedStatementExecutor
batchPreparedStatementExecutor;
+ public DriverExecuteBatchExecutor(final ShardingSphereConnection
connection, final ShardingSphereMetaData metaData, final ShardingSphereDatabase
database, final JDBCExecutor jdbcExecutor) {
+ this.connection = connection;
+ this.metaData = metaData;
+ batchPreparedStatementExecutor = new
BatchPreparedStatementExecutor(database, jdbcExecutor,
connection.getProcessId());
+ }
+
/**
* Add batch.
*
@@ -96,7 +105,7 @@ public final class DriverExecuteBatchExecutor {
* @param prepareEngine prepare engine
* @param executionContext execution context
* @param addCallback statement add callback
- * @param replayCallback statement replay callback
+ * @param replayCallback prepared statement parameters replay callback
* @param generatedKeyCallback generated key callback
* @return generated keys
* @throws SQLException SQL exception
@@ -104,7 +113,7 @@ public final class DriverExecuteBatchExecutor {
@SuppressWarnings("rawtypes")
public int[] executeBatch(final ShardingSphereDatabase database, final
SQLStatementContext sqlStatementContext, final Collection<Comparable<?>>
generatedValues,
final StatementOption statementOption, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final ExecutionContext executionContext, final
StatementAddCallback addCallback, final StatementReplayCallback replayCallback,
+ final ExecutionContext executionContext, final
StatementAddCallback addCallback, final
PreparedStatementParametersReplayCallback replayCallback,
final GeneratedKeyCallback generatedKeyCallback)
throws SQLException {
// TODO add raw SQL executor
return doExecuteBatch(database, batchPreparedStatementExecutor,
@@ -115,7 +124,8 @@ public final class DriverExecuteBatchExecutor {
private int[] doExecuteBatch(final ShardingSphereDatabase database, final
BatchPreparedStatementExecutor batchExecutor,
final SQLStatementContext
sqlStatementContext, final Collection<Comparable<?>> generatedValues, final
StatementOption statementOption,
final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final ExecutionContext executionContext,
- final StatementAddCallback addCallback, final
StatementReplayCallback replayCallback, final GeneratedKeyCallback
generatedKeyCallback) throws SQLException {
+ final StatementAddCallback addCallback, final
PreparedStatementParametersReplayCallback replayCallback,
+ final GeneratedKeyCallback
generatedKeyCallback) throws SQLException {
initBatchPreparedStatementExecutor(database, batchExecutor,
prepareEngine, executionContext, replayCallback);
int[] result = batchExecutor.executeBatch(sqlStatementContext);
if (statementOption.isReturnGeneratedKeys() &&
generatedValues.isEmpty()) {
@@ -127,7 +137,7 @@ public final class DriverExecuteBatchExecutor {
private void initBatchPreparedStatementExecutor(final
ShardingSphereDatabase database, final BatchPreparedStatementExecutor
batchExecutor,
final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final ExecutionContext
executionContext, final StatementReplayCallback replayCallback) throws
SQLException {
+ final ExecutionContext
executionContext, final PreparedStatementParametersReplayCallback
replayCallback) throws SQLException {
List<ExecutionUnit> executionUnits = new
ArrayList<>(batchExecutor.getBatchExecutionUnits().size());
for (BatchExecutionUnit each : batchExecutor.getBatchExecutionUnits())
{
ExecutionUnit executionUnit = each.getExecutionUnit();
@@ -135,6 +145,22 @@ public final class DriverExecuteBatchExecutor {
}
batchExecutor.init(prepareEngine
.prepare(executionContext.getRouteContext(), executionUnits,
new ExecutionGroupReportContext(connection.getProcessId(), database.getName(),
new Grantee("", ""))));
- replayCallback.replay();
+ setBatchParameters(replayCallback);
+ }
+
+ private void setBatchParameters(final
PreparedStatementParametersReplayCallback replayCallback) throws SQLException {
+ for (Statement each : batchPreparedStatementExecutor.getStatements()) {
+ for (List<Object> eachParams :
batchPreparedStatementExecutor.getParameterSet(each)) {
+ replayCallback.replay((PreparedStatement) each, eachParams);
+ ((PreparedStatement) each).addBatch();
+ }
+ }
+ }
+
+ /**
+ * Clear.
+ */
+ public void clear() {
+ batchPreparedStatementExecutor.clear();
}
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
index a3ab77d409d..5a745229c34 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutorFacade.java
@@ -18,12 +18,12 @@
package org.apache.shardingsphere.driver.executor;
import lombok.Getter;
-import
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
@@ -50,7 +50,11 @@ public final class DriverExecutorFacade implements
AutoCloseable {
@Getter
private final DriverExecuteBatchExecutor executeBatchExecutor;
- public DriverExecutorFacade(final ShardingSphereConnection connection,
final BatchPreparedStatementExecutor batchPreparedStatementExecutor) {
+ public DriverExecutorFacade(final ShardingSphereConnection connection) {
+ this(connection, null);
+ }
+
+ public DriverExecutorFacade(final ShardingSphereConnection connection,
final ShardingSphereDatabase database) {
JDBCExecutor jdbcExecutor = new
JDBCExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
DriverJDBCExecutor regularExecutor = new
DriverJDBCExecutor(connection.getDatabaseName(),
connection.getContextManager(), jdbcExecutor);
RawExecutor rawExecutor = new
RawExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
@@ -61,7 +65,16 @@ public final class DriverExecutorFacade implements
AutoCloseable {
queryExecutor = new DriverExecuteQueryExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
updateExecutor = new DriverExecuteUpdateExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor);
executeExecutor = new DriverExecuteExecutor(connection, metaData,
regularExecutor, rawExecutor, trafficExecutor, sqlFederationEngine);
- executeBatchExecutor = new DriverExecuteBatchExecutor(connection,
metaData, batchPreparedStatementExecutor);
+ executeBatchExecutor = null == database ? null : new
DriverExecuteBatchExecutor(connection, metaData, database, jdbcExecutor);
+ }
+
+ /**
+ * Clear.
+ */
+ public void clear() {
+ if (null != executeBatchExecutor) {
+ executeBatchExecutor.clear();
+ }
}
@Override
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
index d60344af42b..7975436642e 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
@@ -248,7 +248,6 @@ public final class BatchPreparedStatementExecutor {
* Clear.
*/
public void clear() {
- getStatements().clear();
executionGroupContext.getInputGroups().clear();
batchCount = 0;
batchExecutionUnits.clear();
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/replay/PreparedStatementParametersReplayCallback.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/replay/PreparedStatementParametersReplayCallback.java
new file mode 100644
index 00000000000..a725d773ee8
--- /dev/null
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/callback/replay/PreparedStatementParametersReplayCallback.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.driver.executor.callback.replay;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Prepared statement parameters replay callback.
+ */
+public interface PreparedStatementParametersReplayCallback {
+
+ /**
+ * Replay to set prepared statement parameters.
+ *
+ * @param preparedStatement prepared statement
+ * @param params parameters
+ * @throws SQLException SQL exception
+ */
+ void replay(PreparedStatement preparedStatement, List<Object> params)
throws SQLException;
+}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index e75e74a08ba..dd2c338cf86 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.driver.jdbc.core.statement;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.shardingsphere.driver.executor.DriverExecutorFacade;
-import
org.apache.shardingsphere.driver.executor.batch.BatchPreparedStatementExecutor;
import
org.apache.shardingsphere.driver.executor.callback.add.StatementAddCallback;
import
org.apache.shardingsphere.driver.jdbc.adapter.AbstractPreparedStatementAdapter;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
@@ -43,7 +42,6 @@ import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEn
import
org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
@@ -108,8 +106,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private final DriverExecutorFacade driverExecutorFacade;
- private final BatchPreparedStatementExecutor
batchPreparedStatementExecutor;
-
private final Collection<Comparable<?>> generatedValues = new
LinkedList<>();
private final boolean statementsCacheable;
@@ -167,9 +163,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
statementOption = returnGeneratedKeys ? new StatementOption(true,
columns) : new StatementOption(resultSetType, resultSetConcurrency,
resultSetHoldability);
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
- JDBCExecutor jdbcExecutor = new
JDBCExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
- batchPreparedStatementExecutor = new
BatchPreparedStatementExecutor(database, jdbcExecutor,
connection.getProcessId());
- driverExecutorFacade = new DriverExecutorFacade(connection,
batchPreparedStatementExecutor);
+ driverExecutorFacade = new DriverExecutorFacade(connection, database);
statementsCacheable =
isStatementsCacheable(database.getRuleMetaData());
selectContainsEnhancedTable = sqlStatementContext instanceof
SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).isContainsEnhancedTable();
statementManager = new StatementManager();
@@ -203,7 +197,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
handleExceptionInTransaction(connection, metaData);
throw SQLExceptionTransformEngine.toSQLException(ex,
metaData.getDatabase(databaseName).getProtocolType());
} finally {
- batchPreparedStatementExecutor.clear();
+ driverExecutorFacade.clear();
clearParameters();
}
}
@@ -403,7 +397,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
try {
return
driverExecutorFacade.getExecuteBatchExecutor().executeBatch(database,
sqlStatementContext, generatedValues, statementOption,
createDriverExecutionPrepareEngine(database),
executionContext, (StatementAddCallback<PreparedStatement>) (statements,
parameterSets) -> this.statements.addAll(statements),
- () ->
setBatchParametersForStatements(batchPreparedStatementExecutor),
+ this::replaySetParameter,
() -> {
currentBatchGeneratedKeysResultSet =
getGeneratedKeys();
statements.clear();
@@ -418,20 +412,10 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
}
}
- private void setBatchParametersForStatements(final
BatchPreparedStatementExecutor batchExecutor) throws SQLException {
- for (Statement each : batchExecutor.getStatements()) {
- List<List<Object>> paramSet = batchExecutor.getParameterSet(each);
- for (List<Object> eachParams : paramSet) {
- replaySetParameter((PreparedStatement) each, eachParams);
- ((PreparedStatement) each).addBatch();
- }
- }
- }
-
@Override
public void clearBatch() {
currentResultSet = null;
- batchPreparedStatementExecutor.clear();
+ driverExecutorFacade.clear();
clearParameters();
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 11d44a94e5d..90d48bb9a45 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -113,7 +113,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
metaData =
connection.getContextManager().getMetaDataContexts().getMetaData();
statements = new LinkedList<>();
statementOption = new StatementOption(resultSetType,
resultSetConcurrency, resultSetHoldability);
- driverExecutorFacade = new DriverExecutorFacade(connection, null);
+ driverExecutorFacade = new DriverExecutorFacade(connection);
statementManager = new StatementManager();
batchStatementExecutor = new BatchStatementExecutor(this);
databaseName = connection.getDatabaseName();