This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ed15cf9b05f Refactor ShardingSpherePreparedStatement (#31406)
ed15cf9b05f is described below
commit ed15cf9b05fa1a113df280df422215e1c2d3f6f5
Author: Liang Zhang <[email protected]>
AuthorDate: Sun May 26 23:11:51 2024 +0800
Refactor ShardingSpherePreparedStatement (#31406)
* Remove ShardingSpherePreparedStatement
* Remove ShardingSpherePreparedStatement
* Remove ShardingSpherePreparedStatement
* Remove ShardingSpherePreparedStatement
* Remove ShardingSpherePreparedStatement
* Remove ShardingSpherePreparedStatement
* Refactor ShardingSpherePreparedStatement
* Refactor ShardingSpherePreparedStatement
* Refactor ShardingSpherePreparedStatement
* Refactor ShardingSpherePreparedStatement
* Refactor ShardingSpherePreparedStatement
* Refactor ShardingSpherePreparedStatement
---
.../statement/ShardingSpherePreparedStatement.java | 51 ++++++++++----------
.../core/statement/ShardingSphereStatement.java | 54 +++++++++-------------
.../traffic/executor/TrafficExecutor.java | 41 ++++++++++++++++
.../traffic/executor/TrafficExecutorTest.java | 49 --------------------
4 files changed, 91 insertions(+), 104 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 227cfc2fbe9..b7dc4c334e5 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -41,7 +41,6 @@ import
org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
import
org.apache.shardingsphere.infra.exception.kernel.syntax.EmptySQLException;
import org.apache.shardingsphere.infra.executor.audit.SQLAuditEngine;
@@ -227,14 +226,15 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
}
clearPrevious();
QueryContext queryContext = createQueryContext();
- handleAutoCommit(queryContext);
+
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- currentResultSet =
executor.getTrafficExecutor().execute(executionUnit, (statement, sql) ->
((PreparedStatement) statement).executeQuery());
+ currentResultSet =
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
+ trafficInstanceId, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).executeQuery());
return currentResultSet;
}
- if (decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
+ if (decide(queryContext, database,
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
currentResultSet = executeFederationQuery(queryContext);
return currentResultSet;
}
@@ -266,19 +266,19 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
return
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, globalRuleMetaData);
}
- private void handleAutoCommit(final QueryContext queryContext) throws
SQLException {
- if
(AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement()))
{
+ private void handleAutoCommit(final SQLStatement sqlStatement) throws
SQLException {
+ if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
connection.handleAutoCommit();
}
}
private JDBCExecutionUnit createTrafficExecutionUnit(final String
trafficInstanceId, final QueryContext queryContext) throws SQLException {
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(database);
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
ExecutionGroupContext<JDBCExecutionUnit> context =
prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
-
ShardingSpherePreconditions.checkState(!context.getInputGroups().isEmpty() &&
!context.getInputGroups().iterator().next().getInputs().isEmpty(),
EmptyTrafficExecutionUnitException::new);
- return
context.getInputGroups().iterator().next().getInputs().iterator().next();
+ return context.getInputGroups().stream().flatMap(each ->
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
}
private Optional<String> getInstanceIdAndSet(final QueryContext
queryContext) {
@@ -319,17 +319,17 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
}
private ResultSet executeFederationQuery(final QueryContext queryContext) {
- PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
+ database.getResourceMetaData(), sqlStatement,
SQLExecutorExceptionHandler.isExceptionThrown());
SQLFederationContext context = new SQLFederationContext(false,
queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
- return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(),
callback, context);
+ return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database),
callback, context);
}
- private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine() {
+ private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
- return new
DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT,
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(),
statementManager,
- statementOption,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits());
+ return new
DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT,
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(),
statementManager, statementOption,
+ database.getRuleMetaData().getRules(),
database.getResourceMetaData().getStorageUnits());
}
@Override
@@ -341,11 +341,12 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
}
clearPrevious();
QueryContext queryContext = createQueryContext();
- handleAutoCommit(queryContext);
+
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- return executor.getTrafficExecutor().execute(executionUnit,
(statement, sql) -> ((PreparedStatement) statement).executeUpdate());
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ return
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
+ trafficInstanceId, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).executeUpdate());
}
executionContext = createExecutionContext(queryContext);
if (hasRawExecutionRule()) {
@@ -405,11 +406,12 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
}
clearPrevious();
QueryContext queryContext = createQueryContext();
- handleAutoCommit(queryContext);
+
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- boolean result =
executor.getTrafficExecutor().execute(executionUnit, (statement, sql) ->
((PreparedStatement) statement).execute());
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ boolean result =
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
+ trafficInstanceId, queryContext,
createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).execute());
currentResultSet =
executor.getTrafficExecutor().getResultSet();
return result;
}
@@ -484,7 +486,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
}
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(database);
return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
new ExecutionGroupReportContext(connection.getProcessId(),
databaseName, new Grantee("", "")));
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index abe0ed5e0a5..d2b2e12dfe2 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -46,8 +46,6 @@ import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -70,7 +68,6 @@ import
org.apache.shardingsphere.infra.merge.result.MergedResult;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import org.apache.shardingsphere.infra.route.context.RouteContext;
import
org.apache.shardingsphere.infra.rule.attribute.datanode.DataNodeRuleAttribute;
import
org.apache.shardingsphere.infra.rule.attribute.raw.RawExecutionRuleAttribute;
import org.apache.shardingsphere.infra.session.query.QueryContext;
@@ -80,7 +77,6 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
-import
org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
@@ -158,15 +154,17 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
ShardingSpherePreconditions.checkNotEmpty(sql, () -> new
EmptySQLException().toSQLException());
try {
QueryContext queryContext = createQueryContext(sql);
- handleAutoCommit(queryContext);
+
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
- currentResultSet =
executor.getTrafficExecutor().execute(createTrafficExecutionUnit(trafficInstanceId,
queryContext), Statement::executeQuery);
+ currentResultSet = executor.getTrafficExecutor().execute(
+ connection.getProcessId(), databaseName,
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database),
Statement::executeQuery);
return currentResultSet;
}
- if (decide(queryContext,
metaDataContexts.getMetaData().getDatabase(databaseName),
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
+ if (decide(queryContext, database,
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
currentResultSet = executeFederationQuery(queryContext);
return currentResultSet;
}
@@ -226,18 +224,17 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
private ResultSet executeFederationQuery(final QueryContext queryContext) {
- StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(),
- SQLExecutorExceptionHandler.isExceptionThrown());
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(database.getProtocolType(),
+ database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
SQLFederationContext context = new SQLFederationContext(false,
queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
- return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(),
callback, context);
+ return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database),
callback, context);
}
- private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine() {
+ private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT,
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(),
statementManager, statementOption,
-
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules(),
-
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData().getStorageUnits());
+ database.getRuleMetaData().getRules(),
database.getResourceMetaData().getStorageUnits());
}
@Override
@@ -311,13 +308,14 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private int executeUpdate0(final String sql, final ExecuteUpdateCallback
updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws
SQLException {
QueryContext queryContext = createQueryContext(sql);
- handleAutoCommit(queryContext);
+
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- return executor.getTrafficExecutor().execute(executionUnit,
trafficCallback);
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ return executor.getTrafficExecutor().execute(
+ connection.getProcessId(), databaseName,
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database),
trafficCallback);
}
executionContext = createExecutionContext(queryContext);
if
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
{
@@ -416,13 +414,14 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private boolean execute0(final String sql, final ExecuteCallback
executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws
SQLException {
QueryContext queryContext = createQueryContext(sql);
- handleAutoCommit(queryContext);
+
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
String trafficInstanceId =
getInstanceIdAndSet(queryContext).orElse(null);
if (null != trafficInstanceId) {
- JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(trafficInstanceId, queryContext);
- boolean result =
executor.getTrafficExecutor().execute(executionUnit, trafficCallback);
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ boolean result = executor.getTrafficExecutor().execute(
+ connection.getProcessId(), databaseName,
trafficInstanceId, queryContext, createDriverExecutionPrepareEngine(database),
trafficCallback);
currentResultSet = executor.getTrafficExecutor().getResultSet();
return result;
}
@@ -440,20 +439,12 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
return executeWithExecutionContext(executeCallback, executionContext);
}
- private void handleAutoCommit(final QueryContext queryContext) throws
SQLException {
- if
(AutoCommitUtils.needOpenTransaction(queryContext.getSqlStatementContext().getSqlStatement()))
{
+ private void handleAutoCommit(final SQLStatement sqlStatement) throws
SQLException {
+ if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
connection.handleAutoCommit();
}
}
- private JDBCExecutionUnit createTrafficExecutionUnit(final String
trafficInstanceId, final QueryContext queryContext) throws SQLException {
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
- ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
- ExecutionGroupContext<JDBCExecutionUnit> context =
- prepareEngine.prepare(new RouteContext(),
Collections.singletonList(executionUnit), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
- return context.getInputGroups().stream().flatMap(each ->
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
- }
-
private void clearStatements() throws SQLException {
for (Statement each : statements) {
each.close();
@@ -495,7 +486,8 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
- DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
+ ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
+ DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(database);
return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
new ExecutionGroupReportContext(connection.getProcessId(),
databaseName, new Grantee("", "")));
}
diff --git
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index b349a280cb1..fddefd4740f 100644
---
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -18,13 +18,23 @@
package org.apache.shardingsphere.traffic.executor;
import lombok.Getter;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
+import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
+import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
+import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.route.context.RouteContext;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+import
org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
import java.util.List;
/**
@@ -54,6 +64,37 @@ public final class TrafficExecutor implements AutoCloseable {
return result;
}
+ /**
+ * Execute.
+ *
+ * @param processId process ID
+ * @param databaseName database name
+ * @param trafficInstanceId traffic instance ID
+ * @param queryContext query context
+ * @param prepareEngine prepare engine
+ * @param callback callback
+ * @param <T> return type
+ * @return execute result
+ * @throws SQLException SQL exception
+ */
+ public <T> T execute(final String processId, final String databaseName,
final String trafficInstanceId, final QueryContext queryContext,
+ final DriverExecutionPrepareEngine<JDBCExecutionUnit,
Connection> prepareEngine, final TrafficExecutorCallback<T> callback) throws
SQLException {
+ JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(processId, databaseName, trafficInstanceId,
queryContext, prepareEngine);
+ SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit();
+ cacheStatement(sqlUnit.getParameters(),
executionUnit.getStorageResource());
+ T result = callback.execute(statement, sqlUnit.getSql());
+ resultSet = statement.getResultSet();
+ return result;
+ }
+
+ private JDBCExecutionUnit createTrafficExecutionUnit(final String
processId, final String databaseName, final String trafficInstanceId, final
QueryContext queryContext,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine)
throws SQLException {
+ ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
+ ExecutionGroupContext<JDBCExecutionUnit> context =
+ prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+ return context.getInputGroups().stream().flatMap(each ->
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
+ }
+
private void cacheStatement(final List<Object> params, final Statement
statement) throws SQLException {
this.statement = statement;
setParameters(statement, params);
diff --git
a/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorTest.java
b/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorTest.java
deleted file mode 100644
index 7074ad28b85..00000000000
---
a/kernel/traffic/core/src/test/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.traffic.executor;
-
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
-import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
-import org.junit.jupiter.api.Test;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-class TrafficExecutorTest {
-
- @Test
- void assertClose() throws SQLException {
- Statement statement = mock(Statement.class, RETURNS_DEEP_STUBS);
- try (TrafficExecutor trafficExecutor = new TrafficExecutor()) {
- JDBCExecutionUnit executionUnit = mock(JDBCExecutionUnit.class);
- when(executionUnit.getExecutionUnit()).thenReturn(new
ExecutionUnit("oltp_proxy_instance_id", new SQLUnit("SELECT 1",
Collections.emptyList())));
- when(executionUnit.getStorageResource()).thenReturn(statement);
- trafficExecutor.execute(executionUnit, Statement::executeQuery);
- }
- verify(statement).close();
- verify(statement, times(0)).getConnection();
- }
-}