This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f1de3fb172 Refactor DriverExecutor (#31418)
6f1de3fb172 is described below

commit 6f1de3fb172794b9a7243d8ba970a30d8eedb74c
Author: Liang Zhang <[email protected]>
AuthorDate: Mon May 27 23:35:40 2024 +0800

    Refactor DriverExecutor (#31418)
    
    * Add ShardingSphereConnection.getTrafficInstanceId()
    
    * Refactor DriverExecutor
    
    * Refactor DriverExecutor
    
    * Refactor DriverExecutor
    
    * Refactor DriverExecutor
    
    * Refactor DriverExecutor
---
 .../driver/DriverExecutionPrepareEngine.java       |   5 +
 .../driver/executor/DriverExecutor.java            | 150 ++++++++++++++++++++-
 .../statement/ShardingSpherePreparedStatement.java |  68 +++-------
 .../core/statement/ShardingSphereStatement.java    |  59 +++-----
 4 files changed, 184 insertions(+), 98 deletions(-)

diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
index 655c5f8836e..4e097b0dacc 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.executor.sql.prepare.driver;
 
+import lombok.Getter;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
@@ -45,6 +46,9 @@ public final class DriverExecutionPrepareEngine<T extends 
DriverExecutionUnit<?>
     @SuppressWarnings("rawtypes")
     private static final Map<String, SQLExecutionUnitBuilder> 
TYPE_TO_BUILDER_MAP = new ConcurrentHashMap<>(8, 1F);
     
+    @Getter
+    private final String type;
+    
     private final DatabaseConnectionManager<C> databaseConnectionManager;
     
     private final ExecutorStatementManager<C, ?, ?> statementManager;
@@ -60,6 +64,7 @@ public final class DriverExecutionPrepareEngine<T extends 
DriverExecutionUnit<?>
                                         final ExecutorStatementManager<C, ?, 
?> statementManager, final StorageResourceOption option, final 
Collection<ShardingSphereRule> rules,
                                         final Map<String, StorageUnit> 
storageUnits) {
         super(maxConnectionsSizePerQuery, rules);
+        this.type = type;
         this.databaseConnectionManager = databaseConnectionManager;
         this.statementManager = statementManager;
         this.option = option;
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
index 510665e241a..ad349f07178 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverExecutor.java
@@ -18,33 +18,56 @@
 package org.apache.shardingsphere.driver.executor;
 
 import lombok.Getter;
+import org.apache.shardingsphere.driver.executor.callback.ExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.impl.PreparedStatementExecuteQueryCallback;
+import 
org.apache.shardingsphere.driver.executor.callback.impl.StatementExecuteQueryCallback;
 import 
org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
+import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawExecutor;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
+import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
 import org.apache.shardingsphere.traffic.executor.TrafficExecutor;
+import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
+import org.apache.shardingsphere.traffic.rule.TrafficRule;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Optional;
 
 /**
  * Driver executor.
  */
-@Getter
 public final class DriverExecutor implements AutoCloseable {
     
+    private final ShardingSphereConnection connection;
+    
+    @Getter
     private final DriverJDBCExecutor regularExecutor;
     
+    @Getter
     private final RawExecutor rawExecutor;
     
+    private final TrafficExecutor trafficExecutor;
+    
     private final SQLFederationEngine sqlFederationEngine;
     
-    private final TrafficExecutor trafficExecutor;
+    private ExecuteType executeType = ExecuteType.REGULAR;
     
     public DriverExecutor(final ShardingSphereConnection connection) {
+        this.connection = connection;
         MetaDataContexts metaDataContexts = 
connection.getContextManager().getMetaDataContexts();
         ExecutorEngine executorEngine = 
connection.getContextManager().getExecutorEngine();
         JDBCExecutor jdbcExecutor = new JDBCExecutor(executorEngine, 
connection.getDatabaseConnectionManager().getConnectionContext());
@@ -52,18 +75,137 @@ public final class DriverExecutor implements AutoCloseable 
{
         rawExecutor = new RawExecutor(executorEngine, 
connection.getDatabaseConnectionManager().getConnectionContext());
         ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName());
         String schemaName = new 
DatabaseTypeRegistry(database.getProtocolType()).getDefaultSchemaName(connection.getDatabaseName());
-        sqlFederationEngine = new 
SQLFederationEngine(connection.getDatabaseName(), schemaName, 
metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
         trafficExecutor = new TrafficExecutor();
+        sqlFederationEngine = new 
SQLFederationEngine(connection.getDatabaseName(), schemaName, 
metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor);
     }
     
     /**
-     * Close.
+     * Execute advance query.
      *
+     * @param metaData meta data
+     * @param database database
+     * @param queryContext query context
+     * @param prepareEngine prepare engine
+     * @return result set
      * @throws SQLException SQL exception
      */
+    public Optional<ResultSet> executeAdvanceQuery(final 
ShardingSphereMetaData metaData, final ShardingSphereDatabase database, final 
QueryContext queryContext,
+                                                   final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) 
throws SQLException {
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
+        if (trafficInstanceId.isPresent()) {
+            TrafficExecutorCallback<ResultSet> trafficCallback = 
JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
+                    ? Statement::executeQuery
+                    : ((statement, sql) -> ((PreparedStatement) 
statement).executeQuery());
+            return 
Optional.of(trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
trafficCallback));
+        }
+        if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
+            ExecuteQueryCallback sqlFederationCallback = 
JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
+                    ? new 
StatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
+                            
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown())
+                    : new 
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
+                            database.getResourceMetaData(), 
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
+            return Optional.of(sqlFederationEngine.executeQuery(prepareEngine, 
sqlFederationCallback, new SQLFederationContext(false, queryContext, metaData, 
connection.getProcessId())));
+        }
+        return Optional.empty();
+    }
+    
+    /**
+     * Execute advance update.
+     *
+     * @param metaData meta data
+     * @param database database
+     * @param queryContext query context
+     * @param prepareEngine prepare engine
+     * @return updated row count
+     * @throws SQLException SQL exception
+     */
+    public Optional<Integer> executeAdvanceUpdate(final ShardingSphereMetaData 
metaData, final ShardingSphereDatabase database, final QueryContext 
queryContext,
+                                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine) 
throws SQLException {
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
+        if (trafficInstanceId.isPresent()) {
+            return 
Optional.of(trafficExecutor.execute(connection.getProcessId(), 
database.getName(),
+                    trafficInstanceId.get(), queryContext, prepareEngine, 
(statement, sql) -> ((PreparedStatement) statement).executeUpdate()));
+        }
+        return Optional.empty();
+    }
+    
+    /**
+     * Execute advance update.
+     *
+     * @param metaData meta data
+     * @param database database
+     * @param queryContext query context
+     * @param prepareEngine prepare engine
+     * @param trafficCallback traffic callback
+     * @return updated row count
+     * @throws SQLException SQL exception
+     */
+    public Optional<Integer> executeAdvanceUpdate(final ShardingSphereMetaData 
metaData, final ShardingSphereDatabase database, final QueryContext 
queryContext,
+                                                  final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                                                  final 
TrafficExecutorCallback<Integer> trafficCallback) throws SQLException {
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
+        if (trafficInstanceId.isPresent()) {
+            return 
Optional.of(trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
trafficCallback));
+        }
+        return Optional.empty();
+    }
+    
+    /**
+     * Execute advance.
+     *
+     * @param metaData meta data
+     * @param database database
+     * @param queryContext query context
+     * @param prepareEngine prepare engine
+     * @param trafficCallback traffic callback
+     * @return execute result
+     * @throws SQLException SQL exception
+     */
+    public Optional<Boolean> executeAdvance(final ShardingSphereMetaData 
metaData, final ShardingSphereDatabase database,
+                                            final QueryContext queryContext, 
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
+                                            final 
TrafficExecutorCallback<Boolean> trafficCallback) throws SQLException {
+        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(metaData.getGlobalRuleMetaData().getSingleRule(TrafficRule.class),
 queryContext);
+        if (trafficInstanceId.isPresent()) {
+            executeType = ExecuteType.TRAFFIC;
+            return 
Optional.of(trafficExecutor.execute(connection.getProcessId(), 
database.getName(), trafficInstanceId.get(), queryContext, prepareEngine, 
trafficCallback));
+        }
+        if (sqlFederationEngine.decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, metaData.getGlobalRuleMetaData())) {
+            executeType = ExecuteType.FEDERATION;
+            ExecuteQueryCallback sqlFederationCallback = 
JDBCDriverType.STATEMENT.equals(prepareEngine.getType())
+                    ? new 
StatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
+                            
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown())
+                    : new 
PreparedStatementExecuteQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(),
+                            
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
+            ResultSet resultSet = 
sqlFederationEngine.executeQuery(prepareEngine, sqlFederationCallback, new 
SQLFederationContext(false, queryContext, metaData, connection.getProcessId()));
+            return Optional.of(null != resultSet);
+        }
+        return Optional.empty();
+    }
+    
+    /**
+     * Get advanced result set.
+     *
+     * @return advanced result set
+     */
+    public Optional<ResultSet> getAdvancedResultSet() {
+        switch (executeType) {
+            case TRAFFIC:
+                return Optional.of(trafficExecutor.getResultSet());
+            case FEDERATION:
+                return Optional.of(sqlFederationEngine.getResultSet());
+            default:
+                return Optional.empty();
+        }
+    }
+    
     @Override
     public void close() throws SQLException {
         sqlFederationEngine.close();
         trafficExecutor.close();
     }
+    
+    public enum ExecuteType {
+        
+        TRAFFIC, FEDERATION, REGULAR
+    }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index fecffc9e8e1..35bf260aeda 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -83,8 +83,6 @@ import 
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
-import 
org.apache.shardingsphere.traffic.exception.EmptyTrafficExecutionUnitException;
 import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
 
@@ -226,14 +224,9 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
-            if (trafficInstanceId.isPresent()) {
-                currentResultSet = 
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
-                        trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).executeQuery());
-                return currentResultSet;
-            }
-            if (decide(queryContext, database, 
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
-                currentResultSet = executeFederationQuery(queryContext);
+            Optional<ResultSet> advancedResultSet = 
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database));
+            if (advancedResultSet.isPresent()) {
+                currentResultSet = advancedResultSet.get();
                 return currentResultSet;
             }
             executionContext = createExecutionContext(queryContext);
@@ -260,25 +253,12 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
         return new ShardingSphereResultSet(resultSets, mergedResult, this, 
selectContainsEnhancedTable, executionContext, columnLabelAndIndexMap);
     }
     
-    private boolean decide(final QueryContext queryContext, final 
ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
-        return 
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, globalRuleMetaData);
-    }
-    
     private void handleAutoCommit(final SQLStatement sqlStatement) throws 
SQLException {
         if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
             connection.handleAutoCommit();
         }
     }
     
-    private JDBCExecutionUnit createTrafficExecutionUnit(final String 
trafficInstanceId, final QueryContext queryContext) throws SQLException {
-        ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-        DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(database);
-        ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new 
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
-        ExecutionGroupContext<JDBCExecutionUnit> context =
-                prepareEngine.prepare(new RouteContext(), 
Collections.singleton(executionUnit), new 
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new 
Grantee("", "")));
-        return context.getInputGroups().stream().flatMap(each -> 
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
-    }
-    
     private void resetParameters() throws SQLException {
         parameterSets.clear();
         parameterSets.add(getParameters());
@@ -298,14 +278,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
                         SQLExecutorExceptionHandler.isExceptionThrown()));
     }
     
-    private ResultSet executeFederationQuery(final QueryContext queryContext) {
-        ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-        PreparedStatementExecuteQueryCallback callback = new 
PreparedStatementExecuteQueryCallback(database.getProtocolType(),
-                database.getResourceMetaData(), sqlStatement, 
SQLExecutorExceptionHandler.isExceptionThrown());
-        SQLFederationContext context = new SQLFederationContext(false, 
queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
-        return 
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database),
 callback, context);
-    }
-    
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new 
DriverExecutionPrepareEngine<>(JDBCDriverType.PREPARED_STATEMENT, 
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), 
statementManager, statementOption,
@@ -322,11 +294,10 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             clearPrevious();
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
-            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
-            if (trafficInstanceId.isPresent()) {
-                ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-                return 
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
-                        trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).executeUpdate());
+            ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
+            Optional<Integer> updatedCount = 
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database));
+            if (updatedCount.isPresent()) {
+                return updatedCount.get();
             }
             executionContext = createExecutionContext(queryContext);
             if (hasRawExecutionRule()) {
@@ -387,20 +358,12 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
             clearPrevious();
             QueryContext queryContext = createQueryContext();
             
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
-            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
-            if (trafficInstanceId.isPresent()) {
-                ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-                boolean result = 
executor.getTrafficExecutor().execute(connection.getProcessId(), databaseName,
-                        trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).execute());
-                currentResultSet = 
executor.getTrafficExecutor().getResultSet();
-                return result;
-            }
-            if (decide(queryContext, 
metaDataContexts.getMetaData().getDatabase(databaseName), 
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
-                ResultSet resultSet = executeFederationQuery(queryContext);
-                currentResultSet = resultSet;
-                return null != resultSet;
+            ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
+            Optional<Boolean> advancedResult = executor.executeAdvance(
+                    metaDataContexts.getMetaData(), database, queryContext, 
createDriverExecutionPrepareEngine(database), (statement, sql) -> 
((PreparedStatement) statement).execute());
+            if (advancedResult.isPresent()) {
+                return advancedResult.get();
             }
-            currentResultSet = null;
             executionContext = createExecutionContext(queryContext);
             if (hasRawExecutionRule()) {
                 Collection<ExecuteResult> results =
@@ -414,8 +377,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             handleExceptionInTransaction(connection, metaDataContexts);
             throw SQLExceptionTransformEngine.toSQLException(ex, 
metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType());
         } finally {
-            batchPreparedStatementExecutor.clear();
-            clearParameters();
+            clearBatch();
         }
     }
     
@@ -477,6 +439,10 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         if (null != currentResultSet) {
             return currentResultSet;
         }
+        Optional<ResultSet> advancedResultSet = 
executor.getAdvancedResultSet();
+        if (advancedResultSet.isPresent()) {
+            return advancedResultSet.get();
+        }
         if (executionContext.getSqlStatementContext() instanceof 
SelectStatementContext || 
executionContext.getSqlStatementContext().getSqlStatement() instanceof 
DALStatement) {
             List<ResultSet> resultSets = getResultSets();
             if (resultSets.isEmpty()) {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 9becb70f884..55edd1785d9 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -74,9 +74,7 @@ import 
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
-import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
 import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
-import org.apache.shardingsphere.traffic.rule.TrafficRule;
 import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
 
 import java.sql.Connection;
@@ -111,8 +109,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     
     private final KernelProcessor kernelProcessor;
     
-    private final TrafficRule trafficRule;
-    
     @Getter(AccessLevel.PROTECTED)
     private final StatementManager statementManager;
     
@@ -141,7 +137,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         statementOption = new StatementOption(resultSetType, 
resultSetConcurrency, resultSetHoldability);
         executor = new DriverExecutor(connection);
         kernelProcessor = new KernelProcessor();
-        trafficRule = 
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
         statementManager = new StatementManager();
         batchStatementExecutor = new BatchStatementExecutor(this);
         databaseName = connection.getDatabaseName();
@@ -156,14 +151,9 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
             databaseName = 
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
             
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-            Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
-            if (trafficInstanceId.isPresent()) {
-                currentResultSet = executor.getTrafficExecutor().execute(
-                        connection.getProcessId(), databaseName, 
trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), Statement::executeQuery);
-                return currentResultSet;
-            }
-            if (decide(queryContext, database, 
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
-                currentResultSet = executeFederationQuery(queryContext);
+            Optional<ResultSet> advancedResultSet = 
executor.executeAdvanceQuery(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database));
+            if (advancedResultSet.isPresent()) {
+                currentResultSet = advancedResultSet.get();
                 return currentResultSet;
             }
             executionContext = createExecutionContext(queryContext);
@@ -186,10 +176,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return new ShardingSphereResultSet(getResultSets(), mergedResult, 
this, selectContainsEnhancedTable, executionContext);
     }
     
-    private boolean decide(final QueryContext queryContext, final 
ShardingSphereDatabase database, final RuleMetaData globalRuleMetaData) {
-        return 
executor.getSqlFederationEngine().decide(queryContext.getSqlStatementContext(), 
queryContext.getParameters(), database, globalRuleMetaData);
-    }
-    
     private List<QueryResult> executeQuery0(final ExecutionContext 
executionContext) throws SQLException {
         if 
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
 {
             return executor.getRawExecutor().execute(
@@ -203,14 +189,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         return 
executor.getRegularExecutor().executeQuery(executionGroupContext, 
executionContext.getQueryContext(), callback);
     }
     
-    private ResultSet executeFederationQuery(final QueryContext queryContext) {
-        ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-        StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(database.getProtocolType(),
-                database.getResourceMetaData(), 
queryContext.getSqlStatementContext().getSqlStatement(), 
SQLExecutorExceptionHandler.isExceptionThrown());
-        SQLFederationContext context = new SQLFederationContext(false, 
queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
-        return 
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(database),
 callback, context);
-    }
-    
     private DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
createDriverExecutionPrepareEngine(final ShardingSphereDatabase database) {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new DriverExecutionPrepareEngine<>(JDBCDriverType.STATEMENT, 
maxConnectionsSizePerQuery, connection.getDatabaseConnectionManager(), 
statementManager, statementOption,
@@ -291,11 +269,10 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
         databaseName = 
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
         
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
-        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
-        if (trafficInstanceId.isPresent()) {
-            ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-            return executor.getTrafficExecutor().execute(
-                    connection.getProcessId(), databaseName, 
trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), trafficCallback);
+        ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
+        Optional<Integer> updatedCount = 
executor.executeAdvanceUpdate(metaDataContexts.getMetaData(), database, 
queryContext, createDriverExecutionPrepareEngine(database), trafficCallback);
+        if (updatedCount.isPresent()) {
+            return updatedCount.get();
         }
         executionContext = createExecutionContext(queryContext);
         if 
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
 {
@@ -393,24 +370,16 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     }
     
     private boolean execute0(final String sql, final ExecuteCallback 
executeCallback, final TrafficExecutorCallback<Boolean> trafficCallback) throws 
SQLException {
+        currentResultSet = null;
         QueryContext queryContext = createQueryContext(sql);
         
handleAutoCommit(queryContext.getSqlStatementContext().getSqlStatement());
         databaseName = 
queryContext.getDatabaseNameFromSQLStatement().orElse(connection.getDatabaseName());
         
connection.getDatabaseConnectionManager().getConnectionContext().setCurrentDatabase(databaseName);
-        Optional<String> trafficInstanceId = 
connection.getTrafficInstanceId(trafficRule, queryContext);
-        if (trafficInstanceId.isPresent()) {
-            ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
-            boolean result = executor.getTrafficExecutor().execute(
-                    connection.getProcessId(), databaseName, 
trafficInstanceId.get(), queryContext, 
createDriverExecutionPrepareEngine(database), trafficCallback);
-            currentResultSet = executor.getTrafficExecutor().getResultSet();
-            return result;
-        }
-        if (decide(queryContext, 
metaDataContexts.getMetaData().getDatabase(databaseName), 
metaDataContexts.getMetaData().getGlobalRuleMetaData())) {
-            ResultSet resultSet = executeFederationQuery(queryContext);
-            currentResultSet = resultSet;
-            return null != resultSet;
+        ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
+        Optional<Boolean> advancedResult = 
executor.executeAdvance(metaDataContexts.getMetaData(), database, queryContext, 
createDriverExecutionPrepareEngine(database), trafficCallback);
+        if (advancedResult.isPresent()) {
+            return advancedResult.get();
         }
-        currentResultSet = null;
         executionContext = createExecutionContext(queryContext);
         if 
(!metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getAttributes(RawExecutionRuleAttribute.class).isEmpty())
 {
             Collection<ExecuteResult> results = 
executor.getRawExecutor().execute(createRawExecutionContext(executionContext), 
executionContext.getQueryContext(), new RawSQLExecutorCallback());
@@ -528,6 +497,10 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         if (null != currentResultSet) {
             return currentResultSet;
         }
+        Optional<ResultSet> advancedResultSet = 
executor.getAdvancedResultSet();
+        if (advancedResultSet.isPresent()) {
+            return advancedResultSet.get();
+        }
         if (executionContext.getSqlStatementContext() instanceof 
SelectStatementContext || 
executionContext.getSqlStatementContext().getSqlStatement() instanceof 
DALStatement) {
             List<ResultSet> resultSets = getResultSets();
             if (resultSets.isEmpty()) {


Reply via email to