This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1118bcde7c2 Move executeUpdate to DriverExecutor (#31452)
1118bcde7c2 is described below
commit 1118bcde7c2d111c3b6da6dc5a2a7c205da1a796
Author: Liang Zhang <[email protected]>
AuthorDate: Fri May 31 08:40:14 2024 +0800
Move executeUpdate to DriverExecutor (#31452)
* Move executeUpdate to DriverExecutor
* Move executeUpdate to DriverExecutor
---
.../driver/executor/DriverExecutor.java | 101 ++++++++++++++++++++-
.../statement/ShardingSpherePreparedStatement.java | 57 ++----------
.../core/statement/ShardingSphereStatement.java | 54 ++---------
3 files changed, 110 insertions(+), 102 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 10878e41d36..0b464cd1da7 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.driver.executor;
import lombok.Getter;
import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
+import
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
import
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
import
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
@@ -29,20 +30,26 @@ import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementCont
import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
+import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
@@ -55,11 +62,13 @@ import
org.apache.shardingsphere.infra.metadata.user.Grantee;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -249,17 +258,99 @@ public final class DriverExecutor implements
AutoCloseable {
* @param queryContext query context
* @param prepareEngine prepare engine
* @param trafficCallback traffic callback
+ * @param updateCallback update callback
+ * @param isNeedImplicitCommitTransaction is need implicit commit
transaction
+ * @param statementReplayCallback statement replay callback
+ * @param executionContext execution context
* @return updated row count
* @throws SQLException SQL exception
*/
- public Optional<Integer> executeAdvanceUpdate(final ShardingSphereMetaData
metaData, final ShardingSphereDatabase database, final QueryContext
queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final
TrafficExecutorCallback<Integer> trafficCallback) throws SQLException {
+ @SuppressWarnings("rawtypes")
+ public int executeAdvanceUpdate(final ShardingSphereMetaData metaData,
final ShardingSphereDatabase database, final QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Integer> trafficCallback,
+ final ExecuteUpdateCallback
updateCallback, final boolean isNeedImplicitCommitTransaction,
+ final StatementReplayCallback
statementReplayCallback, final ExecutionContext executionContext) throws
SQLException {
Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
if (trafficInstanceId.isPresent()) {
- return
Optional.of(trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback));
+ return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback);
}
- return Optional.empty();
+ return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
+ ? executeUpdate(database, updateCallback,
queryContext.getSqlStatementContext(), executionContext, prepareEngine,
isNeedImplicitCommitTransaction, statementReplayCallback)
+ :
accumulate(rawExecutor.execute(createRawExecutionGroupContext(metaData,
database, executionContext), queryContext, new RawSQLExecutorCallback()));
+ }
+
+ @SuppressWarnings("rawtypes")
+ private int executeUpdate(final ShardingSphereDatabase database, final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext, final ExecutionContext executionContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final boolean isNeedImplicitCommitTransaction,
+ final StatementReplayCallback
statementReplayCallback) throws SQLException {
+ return isNeedImplicitCommitTransaction
+ ? executeWithImplicitCommitTransaction(() ->
useDriverToExecuteUpdate(
+ database, updateCallback, sqlStatementContext,
executionContext, prepareEngine, statementReplayCallback), connection,
database.getProtocolType())
+ : useDriverToExecuteUpdate(database, updateCallback,
sqlStatementContext, executionContext, prepareEngine, statementReplayCallback);
+ }
+
+ private <T> T executeWithImplicitCommitTransaction(final
ImplicitTransactionCallback<T> callback, final Connection connection, final
DatabaseType databaseType) throws SQLException {
+ T result;
+ try {
+ connection.setAutoCommit(false);
+ result = callback.execute();
+ connection.commit();
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ connection.rollback();
+ throw SQLExceptionTransformEngine.toSQLException(ex, databaseType);
+ } finally {
+ connection.setAutoCommit(true);
+ }
+ return result;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private int useDriverToExecuteUpdate(final ShardingSphereDatabase
database, final ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext,
+ final ExecutionContext
executionContext, final DriverExecutionPrepareEngine<JDBCExecutionUnit,
Connection> prepareEngine,
+ final StatementReplayCallback
statementReplayCallback) throws SQLException {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(database, executionContext, prepareEngine);
+ for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
+ statements.addAll(getStatements(each));
+ if
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
+ parameterSets.addAll(getParameterSets(each));
+ }
+ }
+ statementReplayCallback.replay(statements, parameterSets);
+ JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(database, updateCallback, sqlStatementContext,
prepareEngine.getType());
+ return regularExecutor.executeUpdate(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), callback);
+ }
+
+ private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ShardingSphereDatabase database, final
ExecutionContext executionContext,
+
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine) throws SQLException {
+ return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
+ new ExecutionGroupReportContext(connection.getProcessId(),
database.getName(), new Grantee("", "")));
+ }
+
+ private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final
ShardingSphereDatabase database,
+ final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext, final String jdbcDriverType) {
+ boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
+ return new JDBCExecutorCallback<Integer>(database.getProtocolType(),
database.getResourceMetaData(), sqlStatementContext.getSqlStatement(),
isExceptionThrown) {
+
+ @Override
+ protected Integer executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
+ return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ?
updateCallback.executeUpdate(sql, statement) : ((PreparedStatement)
statement).executeUpdate();
+ }
+
+ @Override
+ protected Optional<Integer> getSaneResult(final SQLStatement
sqlStatement, final SQLException ex) {
+ return Optional.empty();
+ }
+ };
+ }
+
+ private int accumulate(final Collection<ExecuteResult> results) {
+ int result = 0;
+ for (ExecuteResult each : results) {
+ result += ((UpdateResult) each).getUpdateCount();
+ }
+ return result;
}
/**
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index f029ee03fd1..f185ba74bc9 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -59,7 +59,6 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -277,18 +276,16 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- Optional<Integer> updatedCount =
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database),
- (statement, sql) -> ((PreparedStatement)
statement).executeUpdate());
- if (updatedCount.isPresent()) {
- return updatedCount.get();
- }
ExecutionContext executionContext =
createExecutionContext(queryContext);
- if (hasRawExecutionRule()) {
- Collection<ExecuteResult> results =
-
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
- return accumulate(results);
+ boolean isNeedImplicitCommitTransaction =
isNeedImplicitCommitTransaction(connection,
sqlStatementContext.getSqlStatement(),
executionContext.getExecutionUnits().size() > 1);
+ final int result =
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database),
+ (statement, sql) -> ((PreparedStatement)
statement).executeUpdate(), null, isNeedImplicitCommitTransaction,
(StatementReplayCallback<PreparedStatement>) this::replay,
+ executionContext);
+ for (Statement each : executor.getStatements()) {
+ statements.add((PreparedStatement) each);
}
- return executeUpdateWithExecutionContext(executionContext);
+ parameterSets.addAll(executor.getParameterSets());
+ return result;
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -299,38 +296,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
}
- private int useDriverToExecuteUpdate(final ExecutionContext
executionContext) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(executionContext);
- cacheStatements(executionGroupContext.getInputGroups());
- return
executor.getRegularExecutor().executeUpdate(executionGroupContext,
- executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(),
createExecuteUpdateCallback());
- }
-
- private int accumulate(final Collection<ExecuteResult> results) {
- int result = 0;
- for (ExecuteResult each : results) {
- result += ((UpdateResult) each).getUpdateCount();
- }
- return result;
- }
-
- private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() {
- boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
- return new
JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatement, isExceptionThrown) {
-
- @Override
- protected Integer executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
- return ((PreparedStatement) statement).executeUpdate();
- }
-
- @Override
- protected Optional<Integer> getSaneResult(final SQLStatement
sqlStatement, final SQLException ex) {
- return Optional.empty();
- }
- };
- }
-
@Override
public boolean execute() throws SQLException {
try {
@@ -370,12 +335,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
: useDriverToExecute(executionContext);
}
- private int executeUpdateWithExecutionContext(final ExecutionContext
executionContext) throws SQLException {
- return isNeedImplicitCommitTransaction(connection,
sqlStatementContext.getSqlStatement(),
executionContext.getExecutionUnits().size() > 1)
- ? executeWithImplicitCommitTransaction(() ->
useDriverToExecuteUpdate(executionContext), connection,
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType())
- : useDriverToExecuteUpdate(executionContext);
- }
-
private boolean useDriverToExecute(final ExecutionContext
executionContext) throws SQLException {
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(executionContext);
cacheStatements(executionGroupContext.getInputGroups());
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 1065c068557..3486275a6ef 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -54,7 +54,6 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.update.UpdateResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
@@ -232,13 +231,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
}
- private int executeUpdate(final ExecuteUpdateCallback updateCallback,
final SQLStatementContext sqlStatementContext, final ExecutionContext
executionContext) throws SQLException {
- return isNeedImplicitCommitTransaction(connection,
sqlStatementContext.getSqlStatement(),
executionContext.getExecutionUnits().size() > 1)
- ? executeWithImplicitCommitTransaction(() ->
useDriverToExecuteUpdate(updateCallback, sqlStatementContext,
executionContext), connection,
-
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType())
- : useDriverToExecuteUpdate(updateCallback,
sqlStatementContext, executionContext);
- }
-
private int executeUpdate0(final String sql, final ExecuteUpdateCallback
updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws
SQLException {
QueryContext queryContext = createQueryContext(sql);
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
@@ -246,47 +238,13 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
sqlStatementContext = queryContext.getSqlStatementContext();
- Optional<Integer> updatedCount =
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database,
queryContext, createDriverExecutionPrepareEngine(database), trafficCallback);
- if (updatedCount.isPresent()) {
- return updatedCount.get();
- }
ExecutionContext executionContext =
createExecutionContext(queryContext);
- if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
- Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
queryContext, new RawSQLExecutorCallback());
- return accumulate(results);
- }
- return executeUpdate(updateCallback,
queryContext.getSqlStatementContext(), executionContext);
- }
-
- private int useDriverToExecuteUpdate(final ExecuteUpdateCallback
updateCallback, final SQLStatementContext sqlStatementContext, final
ExecutionContext executionContext) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(executionContext);
- cacheStatements(executionGroupContext.getInputGroups());
- JDBCExecutorCallback<Integer> callback =
createExecuteUpdateCallback(updateCallback, sqlStatementContext);
- return
executor.getRegularExecutor().executeUpdate(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), callback);
- }
-
- private JDBCExecutorCallback<Integer> createExecuteUpdateCallback(final
ExecuteUpdateCallback updateCallback, final SQLStatementContext
sqlStatementContext) {
- boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
- return new
JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatementContext.getSqlStatement(), isExceptionThrown) {
-
- @Override
- protected Integer executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
- return updateCallback.executeUpdate(sql, statement);
- }
-
- @Override
- protected Optional<Integer> getSaneResult(final SQLStatement
sqlStatement, final SQLException ex) {
- return Optional.empty();
- }
- };
- }
-
- private int accumulate(final Collection<ExecuteResult> results) {
- int result = 0;
- for (ExecuteResult each : results) {
- result += ((UpdateResult) each).getUpdateCount();
- }
+ boolean isNeedImplicitCommitTransaction =
isNeedImplicitCommitTransaction(connection,
sqlStatementContext.getSqlStatement(),
executionContext.getExecutionUnits().size() > 1);
+ int result = executor.executeAdvanceUpdate(
+ metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback, updateCallback,
isNeedImplicitCommitTransaction,
+ (StatementReplayCallback<Statement>) (statements,
parameterSets) -> replay(statements), executionContext);
+ statements.addAll(executor.getStatements());
+ replay(statements);
return result;
}