This is an automated email from the ASF dual-hosted git repository.
zichaowang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c84a3d278ea Refactor TrafficExecutorCallback (#31522)
c84a3d278ea is described below
commit c84a3d278eaf51ee48ae16df0ae0aa9d1118cc21
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Jun 2 03:21:19 2024 +0800
Refactor TrafficExecutorCallback (#31522)
---
.../driver/executor/DriverExecutor.java | 12 +++++-----
.../statement/ShardingSpherePreparedStatement.java | 6 ++---
.../core/statement/ShardingSphereStatement.java | 26 ++++++++++------------
.../traffic/executor/TrafficExecutor.java | 2 +-
.../traffic/executor/TrafficExecutorCallback.java | 4 ++--
5 files changed, 23 insertions(+), 27 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 8d2430f191c..42f7ebc8691 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -164,7 +164,7 @@ public final class DriverExecutor implements AutoCloseable {
}
private TrafficExecutorCallback<ResultSet>
getTrafficExecuteQueryCallback(final String jdbcDriverType) {
- return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ?
Statement::executeQuery : ((statement, sql) -> ((PreparedStatement)
statement).executeQuery());
+ return JDBCDriverType.STATEMENT.equals(jdbcDriverType) ? ((sql,
statement) -> statement.executeQuery(sql)) : ((sql, statement) ->
((PreparedStatement) statement).executeQuery());
}
private ExecuteQueryCallback getExecuteQueryCallback(final
ShardingSphereDatabase database, final QueryContext queryContext, final String
jdbcDriverType) {
@@ -257,7 +257,6 @@ public final class DriverExecutor implements AutoCloseable {
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
- * @param trafficCallback traffic callback
* @param updateCallback update callback
* @param statementReplayCallback statement replay callback
* @return updated row count
@@ -265,11 +264,11 @@ public final class DriverExecutor implements
AutoCloseable {
*/
@SuppressWarnings("rawtypes")
public int executeUpdate(final ShardingSphereDatabase database, final
QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Integer> trafficCallback,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final ExecuteUpdateCallback updateCallback, final
StatementReplayCallback statementReplayCallback) throws SQLException {
Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
if (trafficInstanceId.isPresent()) {
- return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback);
+ return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
updateCallback::executeUpdate);
}
ExecutionContext executionContext = createExecutionContext(database,
queryContext);
return
database.getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty()
@@ -360,7 +359,6 @@ public final class DriverExecutor implements AutoCloseable {
* @param database database
* @param queryContext query context
* @param prepareEngine prepare engine
- * @param trafficCallback traffic callback
* @param executeCallback execute callback
* @param statementReplayCallback statement replay callback
* @return execute result
@@ -368,12 +366,12 @@ public final class DriverExecutor implements
AutoCloseable {
*/
@SuppressWarnings("rawtypes")
public boolean executeAdvance(final ShardingSphereDatabase database, final
QueryContext queryContext,
- final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final TrafficExecutorCallback<Boolean> trafficCallback,
+ final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final ExecuteCallback executeCallback, final
StatementReplayCallback statementReplayCallback) throws SQLException {
Optional<String> trafficInstanceId =
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
queryContext);
if (trafficInstanceId.isPresent()) {
executeType = ExecuteType.TRAFFIC;
- return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
trafficCallback);
+ return trafficExecutor.execute(connection.getProcessId(),
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine,
executeCallback::execute);
}
if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(),
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
executeType = ExecuteType.FEDERATION;
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 198fe9bb813..a3dfffaa2fa 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -261,7 +261,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaData.getDatabase(databaseName);
final int result = executor.executeUpdate(database, queryContext,
createDriverExecutionPrepareEngine(database),
- (statement, sql) -> ((PreparedStatement)
statement).executeUpdate(), null, (StatementReplayCallback<PreparedStatement>)
this::replay);
+ (sql, statement) -> ((PreparedStatement)
statement).executeUpdate(), (StatementReplayCallback<PreparedStatement>)
this::replay);
for (Statement each : executor.getStatements()) {
statements.add((PreparedStatement) each);
}
@@ -289,8 +289,8 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
QueryContext queryContext = createQueryContext();
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
ShardingSphereDatabase database =
metaData.getDatabase(databaseName);
- final boolean result = executor.executeAdvance(database,
queryContext, createDriverExecutionPrepareEngine(database), (statement, sql) ->
((PreparedStatement) statement).execute(),
- null, (StatementReplayCallback<PreparedStatement>)
this::replay);
+ final boolean result = executor.executeAdvance(database,
queryContext, createDriverExecutionPrepareEngine(database), (sql, statement) ->
((PreparedStatement) statement).execute(),
+ (StatementReplayCallback<PreparedStatement>) this::replay);
for (Statement each : executor.getStatements()) {
statements.add((PreparedStatement) each);
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 7cb715a0b5f..2437e7a54dc 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -55,7 +55,6 @@ import
org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
-import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
import java.sql.Connection;
@@ -151,7 +150,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Override
public int executeUpdate(final String sql) throws SQLException {
try {
- return executeUpdate(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL), Statement::executeUpdate);
+ return executeUpdate(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -168,8 +167,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
returnGeneratedKeys = true;
}
try {
- return executeUpdate(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, autoGeneratedKeys),
- (statement, actualSQL) ->
statement.executeUpdate(actualSQL, autoGeneratedKeys));
+ return executeUpdate(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, autoGeneratedKeys));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -184,7 +182,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public int executeUpdate(final String sql, final int[] columnIndexes)
throws SQLException {
returnGeneratedKeys = true;
try {
- return executeUpdate(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnIndexes), (statement, actualSQL) ->
statement.executeUpdate(actualSQL, columnIndexes));
+ return executeUpdate(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnIndexes));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -199,7 +197,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public int executeUpdate(final String sql, final String[] columnNames)
throws SQLException {
returnGeneratedKeys = true;
try {
- return executeUpdate(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnNames), (statement, actualSQL) ->
statement.executeUpdate(actualSQL, columnNames));
+ return executeUpdate(sql, (actualSQL, statement) ->
statement.executeUpdate(actualSQL, columnNames));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -210,7 +208,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
}
- private int executeUpdate(final String sql, final ExecuteUpdateCallback
updateCallback, final TrafficExecutorCallback<Integer> trafficCallback) throws
SQLException {
+ private int executeUpdate(final String sql, final ExecuteUpdateCallback
updateCallback) throws SQLException {
QueryContext queryContext = createQueryContext(sql);
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
databaseName =
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
@@ -218,7 +216,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
sqlStatementContext = queryContext.getSqlStatementContext();
clearStatements();
- int result = executor.executeUpdate(database, queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback, updateCallback,
+ int result = executor.executeUpdate(database, queryContext,
createDriverExecutionPrepareEngine(database), updateCallback,
(StatementReplayCallback<Statement>) (statements,
parameterSets) -> replay(statements));
statements.addAll(executor.getStatements());
replay(statements);
@@ -228,7 +226,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Override
public boolean execute(final String sql) throws SQLException {
try {
- return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL), Statement::execute);
+ return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL));
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
@@ -243,7 +241,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
returnGeneratedKeys = true;
}
- return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, autoGeneratedKeys), (statement, actualSQL) ->
statement.execute(actualSQL, autoGeneratedKeys));
+ return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, autoGeneratedKeys));
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
@@ -256,7 +254,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public boolean execute(final String sql, final int[] columnIndexes) throws
SQLException {
try {
returnGeneratedKeys = true;
- return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnIndexes), (statement, actualSQL) ->
statement.execute(actualSQL, columnIndexes));
+ return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnIndexes));
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
@@ -269,7 +267,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
public boolean execute(final String sql, final String[] columnNames)
throws SQLException {
try {
returnGeneratedKeys = true;
- return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnNames), (statement, actualSQL) ->
statement.execute(actualSQL, columnNames));
+ return execute0(sql, (actualSQL, statement) ->
statement.execute(actualSQL, columnNames));
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
@@ -278,7 +276,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
}
}
- private boolean execute0(final String sql, final ExecuteCallback
executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws
SQLException {
+ private boolean execute0(final String sql, final ExecuteCallback
executeCallback) throws SQLException {
currentResultSet = null;
QueryContext queryContext = createQueryContext(sql);
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
@@ -287,7 +285,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
ShardingSphereDatabase database = metaData.getDatabase(databaseName);
sqlStatementContext = queryContext.getSqlStatementContext();
clearStatements();
- boolean result = executor.executeAdvance(database, queryContext,
createDriverExecutionPrepareEngine(database), trafficCallback,
+ boolean result = executor.executeAdvance(database, queryContext,
createDriverExecutionPrepareEngine(database),
executeCallback, (StatementReplayCallback<Statement>)
(statements, parameterSets) -> replay(statements));
statements.addAll(executor.getStatements());
return result;
diff --git
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
index 7706ac23650..56e42edf713 100644
---
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
+++
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutor.java
@@ -65,7 +65,7 @@ public final class TrafficExecutor implements AutoCloseable {
JDBCExecutionUnit executionUnit =
createTrafficExecutionUnit(processId, databaseName, trafficInstanceId,
queryContext, prepareEngine);
SQLUnit sqlUnit = executionUnit.getExecutionUnit().getSqlUnit();
cacheStatement(sqlUnit.getParameters(),
executionUnit.getStorageResource());
- T result = callback.execute(statement, sqlUnit.getSql());
+ T result = callback.execute(sqlUnit.getSql(), statement);
resultSet = statement.getResultSet();
return result;
}
diff --git
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorCallback.java
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorCallback.java
index bc6a5140d37..e6b25160ef0 100644
---
a/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorCallback.java
+++
b/kernel/traffic/core/src/main/java/org/apache/shardingsphere/traffic/executor/TrafficExecutorCallback.java
@@ -30,10 +30,10 @@ public interface TrafficExecutorCallback<T> {
/**
* Execute.
*
- * @param statement statement
* @param sql SQL
+ * @param statement statement
* @return execution result
* @throws SQLException SQL exception
*/
- T execute(Statement statement, String sql) throws SQLException;
+ T execute(String sql, Statement statement) throws SQLException;
}