This is an automated email from the ASF dual-hosted git repository.
zichaowang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c27fa6c1259 Move execute to DriverExecutor (#31513)
c27fa6c1259 is described below
commit c27fa6c1259aea2e5bd3674883a2d3e088be527d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jun 1 03:04:59 2024 +0800
Move execute to DriverExecutor (#31513)
---
.../driver/executor/DriverExecutor.java | 95 ++++++++++++++++---
.../jdbc/adapter/AbstractStatementAdapter.java | 41 --------
.../statement/ShardingSpherePreparedStatement.java | 99 +++-----------------
.../core/statement/ShardingSphereStatement.java | 103 ++-------------------
4 files changed, 104 insertions(+), 234 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index f111e352828..86cf6ff9399 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.driver.executor;
import lombok.Getter;
+import org.apache.shardingsphere.driver.executor.callback.ExecuteCallback;
import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
import
org.apache.shardingsphere.driver.executor.callback.ExecuteUpdateCallback;
import
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
@@ -63,12 +64,16 @@ import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttrib
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
+import org.apache.shardingsphere.transaction.api.TransactionType;
import
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
+import org.apache.shardingsphere.transaction.rule.TransactionRule;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -259,21 +264,21 @@ public final class DriverExecutor implements
AutoCloseable {
* @param prepareEngine prepare engine
* @param trafficCallback traffic callback
* @param updateCallback update callback
- * @param isNeedImplicitCommitTransaction is need implicit commit
transaction
* @param statementReplayCallback statement replay callback
- * @param executionContext execution context
* @return updated row count
* @throws SQLException SQL exception
*/
@SuppressWarnings("rawtypes")
public int executeUpdate(final ShardingSphereMetaData metaData, final
ShardingSphereDatabase database, final QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final TrafficExecutorCallback<Integer>
trafficCallback, final ExecuteUpdateCallback updateCallback, final
StatementReplayCallback statementReplayCallback,
- final boolean isNeedImplicitCommitTransaction,
final ExecutionContext executionContext) throws SQLException {
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Integer> trafficCallback,
+ final ExecuteUpdateCallback updateCallback, final
StatementReplayCallback statementReplayCallback) throws SQLException {
+ ExecutionContext executionContext = createExecutionContext(metaData,
database, queryContext);
Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
if (trafficInstanceId.isPresent()) {
return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback);
}
+ boolean isNeedImplicitCommitTransaction =
isNeedImplicitCommitTransaction(
+ connection,
queryContext.getSqlStatementContext().getSqlStatement(),
executionContext.getExecutionUnits().size() > 1);
return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
? executeUpdate(database, updateCallback,
queryContext.getSqlStatementContext(), executionContext, prepareEngine,
isNeedImplicitCommitTransaction, statementReplayCallback)
:
accumulate(rawExecutor.execute(createRawExecutionGroupContext(metaData,
database, executionContext), queryContext, new RawSQLExecutorCallback()));
@@ -361,24 +366,92 @@ public final class DriverExecutor implements
AutoCloseable {
* @param queryContext query context
* @param prepareEngine prepare engine
* @param trafficCallback traffic callback
+ * @param executeCallback execute callback
+ * @param statementReplayCallback statement replay callback
* @return execute result
* @throws SQLException SQL exception
*/
- public Optional<Boolean> executeAdvance(final ShardingSphereMetaData
metaData, final ShardingSphereDatabase database,
- final QueryContext queryContext,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final
TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
+ @SuppressWarnings("rawtypes")
+ public boolean executeAdvance(final ShardingSphereMetaData metaData, final
ShardingSphereDatabase database, final QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Boolean> trafficCallback,
+ final ExecuteCallback executeCallback, final
StatementReplayCallback statementReplayCallback) throws SQLException {
Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
if (trafficInstanceId.isPresent()) {
executeType = ExecuteType.TRAFFIC;
- return
Optional.of(trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback));
+ return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback);
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
executeType = ExecuteType.FEDERATION;
ResultSet resultSet = sqlFederationEngine.executeQuery(
prepareEngine, getExecuteQueryCallback(database,
queryContext, prepareEngine.getType()), new SQLFederationContext(false,
queryContext, metaData, connection.getProcessId()));
- return Optional.of(null != resultSet);
+ return null != resultSet;
+ }
+ ExecutionContext executionContext = createExecutionContext(metaData,
database, queryContext);
+ if
(!database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
+ Collection<ExecuteResult> results =
rawExecutor.execute(createRawExecutionGroupContext(metaData, database,
executionContext), queryContext, new RawSQLExecutorCallback());
+ return results.iterator().next() instanceof QueryResult;
+ }
+ boolean isNeedImplicitCommitTransaction =
isNeedImplicitCommitTransaction(
+ connection,
queryContext.getSqlStatementContext().getSqlStatement(),
executionContext.getExecutionUnits().size() > 1);
+ return executeWithExecutionContext(database, executeCallback,
executionContext, prepareEngine, isNeedImplicitCommitTransaction,
statementReplayCallback);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private boolean executeWithExecutionContext(final ShardingSphereDatabase
database, final ExecuteCallback executeCallback, final ExecutionContext
executionContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+ final boolean
isNeedImplicitCommitTransaction, final StatementReplayCallback
statementReplayCallback) throws SQLException {
+ return isNeedImplicitCommitTransaction
+ ? executeWithImplicitCommitTransaction(() ->
useDriverToExecute(database, executeCallback, executionContext, prepareEngine,
statementReplayCallback), connection,
+ database.getProtocolType())
+ : useDriverToExecute(database, executeCallback,
executionContext, prepareEngine, statementReplayCallback);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private boolean useDriverToExecute(final ShardingSphereDatabase database,
final ExecuteCallback callback, final ExecutionContext executionContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final StatementReplayCallback statementReplayCallback) throws SQLException {
+ ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(database, executionContext, prepareEngine);
+ for (ExecutionGroup<JDBCExecutionUnit> each :
executionGroupContext.getInputGroups()) {
+ statements.addAll(getStatements(each));
+ if
(JDBCDriverType.PREPARED_STATEMENT.equals(prepareEngine.getType())) {
+ parameterSets.addAll(getParameterSets(each));
+ }
}
- return Optional.empty();
+ statementReplayCallback.replay(statements, parameterSets);
+ JDBCExecutorCallback<Boolean> jdbcExecutorCallback =
createExecuteCallback(database, callback,
executionContext.getSqlStatementContext().getSqlStatement(),
prepareEngine.getType());
+ return regularExecutor.execute(executionGroupContext,
executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
+ }
+
+ private JDBCExecutorCallback<Boolean> createExecuteCallback(final
ShardingSphereDatabase database, final ExecuteCallback executeCallback,
+ final
SQLStatement sqlStatement, final String jdbcDriverType) {
+ boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
+ return new JDBCExecutorCallback<Boolean>(database.getProtocolType(),
database.getResourceMetaData(), sqlStatement, isExceptionThrown) {
+
+ @Override
+ protected Boolean executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
+ return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ?
executeCallback.execute(sql, statement) : ((PreparedStatement)
statement).execute();
+ }
+
+ @Override
+ protected Optional<Boolean> getSaneResult(final SQLStatement
sqlStatement1, final SQLException ex) {
+ return Optional.empty();
+ }
+ };
+ }
+
+ private boolean isNeedImplicitCommitTransaction(final
ShardingSphereConnection connection, final SQLStatement sqlStatement, final
boolean multiExecutionUnits) {
+ if (!connection.getAutoCommit()) {
+ return false;
+ }
+ TransactionType transactionType =
connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType();
+ boolean isInTransaction =
connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction();
+ if (!TransactionType.isDistributedTransaction(transactionType) ||
isInTransaction) {
+ return false;
+ }
+ return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
+ }
+
+ private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
+ return sqlStatement instanceof DMLStatement && !(sqlStatement
instanceof SelectStatement);
}
/**
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 75646a43cde..d024fc4a6f4 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -26,16 +26,8 @@ import
org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
-import org.apache.shardingsphere.transaction.api.TransactionType;
-import
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
-import org.apache.shardingsphere.transaction.rule.TransactionRule;
-import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLWarning;
@@ -61,39 +53,6 @@ public abstract class AbstractStatementAdapter extends
WrapperAdapter implements
private boolean closeOnCompletion;
- protected final boolean isNeedImplicitCommitTransaction(final
ShardingSphereConnection connection, final SQLStatement sqlStatement, final
boolean multiExecutionUnits) {
- if (!connection.getAutoCommit()) {
- return false;
- }
- TransactionType transactionType =
connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType();
- boolean isInTransaction =
connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction();
- if (!TransactionType.isDistributedTransaction(transactionType) ||
isInTransaction) {
- return false;
- }
- return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
- }
-
- protected final <T> T executeWithImplicitCommitTransaction(final
ImplicitTransactionCallback<T> callback, final Connection connection, final
DatabaseType databaseType) throws SQLException {
- T result;
- try {
- connection.setAutoCommit(false);
- result = callback.execute();
- connection.commit();
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- connection.rollback();
- throw SQLExceptionTransformEngine.toSQLException(ex, databaseType);
- } finally {
- connection.setAutoCommit(true);
- }
- return result;
- }
-
- private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
- return sqlStatement instanceof DMLStatement && !(sqlStatement
instanceof SelectStatement);
- }
-
protected final void handleExceptionInTransaction(final
ShardingSphereConnection connection, final MetaDataContexts metaDataContexts) {
if
(connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction())
{
DatabaseType databaseType =
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType();
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 1b6e78bee3e..151ffd1bfc2 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -38,31 +38,21 @@ import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatem
import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import
org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
@@ -74,7 +64,6 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
import org.apache.shardingsphere.infra.route.context.RouteContext;
import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
-import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import
org.apache.shardingsphere.infra.rule.attribute.resoure.StorageConnectorReusableRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -115,8 +104,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private final List<List<Object>> parameterSets;
- private final SQLStatement sqlStatement;
-
private final SQLStatementContext sqlStatementContext;
private final String databaseName;
@@ -190,7 +177,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
parameterSets = new ArrayList<>();
SQLParserRule sqlParserRule =
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class);
SQLParserEngine sqlParserEngine =
sqlParserRule.getSQLParserEngine(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType());
- sqlStatement = sqlParserEngine.parse(this.sql, true);
+ SQLStatement sqlStatement = sqlParserEngine.parse(this.sql, true);
sqlStatementContext = new
SQLBindEngine(metaDataContexts.getMetaData(), connection.getDatabaseName(),
hintValueContext).bind(sqlStatement, Collections.emptyList());
databaseName =
sqlStatementContext.getTablesContext().getDatabaseName().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
@@ -243,10 +230,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
}
- private boolean hasRawExecutionRule() {
- return
!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty();
- }
-
private void handleAutoCommit(final SQLStatement sqlStatement) throws
SQLException {
if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
connection.handleAutoCommit();
@@ -276,15 +259,13 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- ExecutionContext executionContext =
createExecutionContext(queryContext);
- boolean isNeedImplicitCommitTransaction =
isNeedImplicitCommitTransaction(connection,
sqlStatementContext.getSqlStatement(),
executionContext.getExecutionUnits().size() > 1);
- int result =
executor.executeUpdate(metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database),
- (statement, sql) -> ((PreparedStatement)
statement).executeUpdate(), null, (StatementReplayCallback<PreparedStatement>)
this::replay,
- isNeedImplicitCommitTransaction, executionContext);
+ final int result =
executor.executeUpdate(metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database),
+ (statement, sql) -> ((PreparedStatement)
statement).executeUpdate(), null, (StatementReplayCallback<PreparedStatement>)
this::replay);
for (Statement each : executor.getStatements()) {
statements.add((PreparedStatement) each);
}
parameterSets.addAll(executor.getParameterSets());
+ findGeneratedKey().ifPresent(optional ->
generatedValues.addAll(optional.getGeneratedValues()));
return result;
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
@@ -307,18 +288,15 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- Optional<Boolean> advancedResult = executor.executeAdvance(
- metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).execute());
- if (advancedResult.isPresent()) {
- return advancedResult.get();
- }
- ExecutionContext executionContext =
createExecutionContext(queryContext);
- if (hasRawExecutionRule()) {
- Collection<ExecuteResult> results =
-
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
executionContext.getQueryContext(), new RawSQLExecutorCallback());
- return results.iterator().next() instanceof QueryResult;
+ final boolean result = executor.executeAdvance(
+ metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).execute(),
+ null, (StatementReplayCallback<PreparedStatement>)
this::replay);
+ for (Statement each : executor.getStatements()) {
+ statements.add((PreparedStatement) each);
}
- return executeWithExecutionContext(executionContext);
+ parameterSets.addAll(executor.getParameterSets());
+ findGeneratedKey().ifPresent(optional ->
generatedValues.addAll(optional.getGeneratedValues()));
+ return result;
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -329,43 +307,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
}
- private boolean executeWithExecutionContext(final ExecutionContext
executionContext) throws SQLException {
- return isNeedImplicitCommitTransaction(connection,
sqlStatementContext.getSqlStatement(),
executionContext.getExecutionUnits().size() > 1)
- ? executeWithImplicitCommitTransaction(() ->
useDriverToExecute(executionContext), connection,
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType())
- : useDriverToExecute(executionContext);
- }
-
- private boolean useDriverToExecute(final ExecutionContext
executionContext) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(executionContext);
- cacheStatements(executionGroupContext.getInputGroups());
- return executor.getRegularExecutor().execute(executionGroupContext,
- executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), createExecuteCallback());
- }
-
- private JDBCExecutorCallback<Boolean> createExecuteCallback() {
- boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
- return new
JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatement, isExceptionThrown) {
-
- @Override
- protected Boolean executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
- return ((PreparedStatement) statement).execute();
- }
-
- @Override
- protected Optional<Boolean> getSaneResult(final SQLStatement
sqlStatement, final SQLException ex) {
- return Optional.empty();
- }
- };
- }
-
- private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(database);
- return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(connection.getProcessId(),
databaseName, new Grantee("", "")));
- }
-
@Override
public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
@@ -424,12 +365,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return new ExecutionContext(queryContext,
Collections.singletonList(executionUnit), new RouteContext());
}
- private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
- int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
- }
-
private QueryContext createQueryContext() {
List<Object> params = new ArrayList<>(getParameters());
if (sqlStatementContext instanceof ParameterAware) {
@@ -444,16 +379,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
return mergeEngine.merge(queryResults, sqlStatementContext);
}
- private void cacheStatements(final
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws
SQLException {
- for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
- each.getInputs().forEach(eachInput -> {
- statements.add((PreparedStatement)
eachInput.getStorageResource());
-
parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters());
- });
- }
- replay(statements, parameterSets);
- }
-
private void replay(final List<PreparedStatement> statements, final
List<List<Object>> parameterSets) throws SQLException {
replaySetParameter(statements, parameterSets);
for (Statement each : statements) {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 8965c273fb1..6294f9cbe9b 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -34,39 +34,22 @@ import
org.apache.shardingsphere.infra.binder.context.statement.dml.InsertStatem
import
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
-import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import
org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
-import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
-import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.hint.SQLHintUtils;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
-import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
@@ -85,7 +68,6 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* ShardingSphere statement.
@@ -105,8 +87,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Getter(AccessLevel.PROTECTED)
private final DriverExecutor executor;
- private final KernelProcessor kernelProcessor;
-
@Getter(AccessLevel.PROTECTED)
private final StatementManager statementManager;
@@ -134,7 +114,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
statements = new LinkedList<>();
statementOption = new StatementOption(resultSetType,
resultSetConcurrency, resultSetHoldability);
executor = new DriverExecutor(connection);
- kernelProcessor = new KernelProcessor();
statementManager = new StatementManager();
batchStatementExecutor = new BatchStatementExecutor(this);
databaseName = connection.getDatabaseName();
@@ -238,11 +217,10 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
sqlStatementContext = queryContext.getSqlStatementContext();
- ExecutionContext executionContext =
createExecutionContext(queryContext);
- boolean isNeedImplicitCommitTransaction =
isNeedImplicitCommitTransaction(connection,
sqlStatementContext.getSqlStatement(),
executionContext.getExecutionUnits().size() > 1);
+ clearStatements();
int result = executor.executeUpdate(
metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback, updateCallback,
- (StatementReplayCallback<Statement>) (statements,
parameterSets) -> replay(statements), isNeedImplicitCommitTransaction,
executionContext);
+ (StatementReplayCallback<Statement>) (statements,
parameterSets) -> replay(statements));
statements.addAll(executor.getStatements());
replay(statements);
return result;
@@ -309,16 +287,11 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
sqlStatementContext = queryContext.getSqlStatementContext();
- Optional<Boolean> advancedResult =
executor.executeAdvance(metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback);
- if (advancedResult.isPresent()) {
- return advancedResult.get();
- }
- ExecutionContext executionContext =
createExecutionContext(queryContext);
- if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
- Collection<ExecuteResult> results =
executor.getRawExecutor().execute(createRawExecutionGroupContext(executionContext),
queryContext, new RawSQLExecutorCallback());
- return results.iterator().next() instanceof QueryResult;
- }
- return executeWithExecutionContext(executeCallback, executionContext);
+ clearStatements();
+ boolean result =
executor.executeAdvance(metaDataContexts.getMetaData(), database, queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback,
+ executeCallback, (StatementReplayCallback<Statement>)
(statements, parameterSets) -> replay(statements));
+ statements.addAll(executor.getStatements());
+ return result;
}
private void handleAutoCommit(final SQLStatement sqlStatement) throws
SQLException {
@@ -332,6 +305,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
each.close();
}
statements.clear();
+ executor.clear();
}
@Override
@@ -358,67 +332,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return new QueryContext(sqlStatementContext, sql,
Collections.emptyList(), hintValueContext);
}
- private ExecutionContext createExecutionContext(final QueryContext
queryContext) throws SQLException {
- clearStatements();
- RuleMetaData globalRuleMetaData =
metaDataContexts.getMetaData().getGlobalRuleMetaData();
- ShardingSphereDatabase currentDatabase =
metaDataContexts.getMetaData().getDatabase(databaseName);
- SQLAuditEngine.audit(queryContext.getSqlStatementContext(),
queryContext.getParameters(), globalRuleMetaData, currentDatabase, null,
queryContext.getHintValueContext());
- return kernelProcessor.generateExecutionContext(queryContext,
currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(),
-
connection.getDatabaseConnectionManager().getConnectionContext());
- }
-
- private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
- ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(database);
- return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(connection.getProcessId(),
databaseName, new Grantee("", "")));
- }
-
- private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
- int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
- }
-
- private boolean executeWithExecutionContext(final ExecuteCallback
executeCallback, final ExecutionContext executionContext) throws SQLException {
- return isNeedImplicitCommitTransaction(connection,
sqlStatementContext.getSqlStatement(),
executionContext.getExecutionUnits().size() > 1)
- ? executeWithImplicitCommitTransaction(() ->
useDriverToExecute(executeCallback, executionContext), connection,
-
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType())
- : useDriverToExecute(executeCallback, executionContext);
- }
-
- private boolean useDriverToExecute(final ExecuteCallback callback, final
ExecutionContext executionContext) throws SQLException {
- ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext =
createExecutionGroupContext(executionContext);
- cacheStatements(executionGroupContext.getInputGroups());
- JDBCExecutorCallback<Boolean> jdbcExecutorCallback =
createExecuteCallback(callback, sqlStatementContext.getSqlStatement());
- return executor.getRegularExecutor().execute(executionGroupContext,
- executionContext.getQueryContext(),
executionContext.getRouteContext().getRouteUnits(), jdbcExecutorCallback);
- }
-
- private void cacheStatements(final
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws
SQLException {
- for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) {
-
statements.addAll(each.getInputs().stream().map(JDBCExecutionUnit::getStorageResource).collect(Collectors.toList()));
- }
- replay(statements);
- }
-
- private JDBCExecutorCallback<Boolean> createExecuteCallback(final
ExecuteCallback executeCallback, final SQLStatement sqlStatement) {
- boolean isExceptionThrown =
SQLExecutorExceptionHandler.isExceptionThrown();
- return new
JDBCExecutorCallback<Boolean>(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatement, isExceptionThrown) {
-
- @Override
- protected Boolean executeSQL(final String sql, final Statement
statement, final ConnectionMode connectionMode, final DatabaseType storageType)
throws SQLException {
- return executeCallback.execute(sql, statement);
- }
-
- @Override
- protected Optional<Boolean> getSaneResult(final SQLStatement
sqlStatement1, final SQLException ex) {
- return Optional.empty();
- }
- };
- }
-
private void replay(final List<Statement> statements) throws SQLException {
for (Statement each : statements) {
getMethodInvocationRecorder().replay(each);