This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c0783fd074 Fix Operation not allowed after ResultSet closed exception 
when use sql federation (#35206)
8c0783fd074 is described below

commit 8c0783fd0746ea7346cf01bde338ac59f0c553ff
Author: Zhengqiang Duan <duanzhengqi...@apache.org>
AuthorDate: Thu Apr 17 15:18:45 2025 +0800

    Fix Operation not allowed after ResultSet closed exception when use sql 
federation (#35206)
---
 RELEASE-NOTES.md                                   |  1 +
 .../driver/DriverExecutionPrepareEngine.java       |  7 +++---
 .../prepare/driver/ExecutorStatementManager.java   |  3 ++-
 .../prepare/driver/SQLExecutionUnitBuilder.java    |  3 ++-
 .../PreparedStatementExecutionUnitBuilder.java     | 11 +++++----
 .../builder/StatementExecutionUnitBuilder.java     |  4 ++--
 .../jdbc/core/statement/StatementManager.java      |  8 ++++---
 .../sqlfederation/engine/SQLFederationEngine.java  | 24 +++++++++++--------
 .../engine/processor/SQLFederationProcessor.java   | 10 ++++++++
 .../impl/StandardSQLFederationProcessor.java       | 26 +++++++++++++++++----
 .../metadata/schema/SQLFederationTable.java        | 27 +++++++++++++++++-----
 .../jdbc/statement/JDBCBackendStatement.java       |  4 ++--
 .../bind/OpenGaussComBatchBindExecutorTest.java    |  2 +-
 ...egatedBatchedStatementsCommandExecutorTest.java |  2 +-
 .../PostgreSQLBatchedStatementsExecutorTest.java   |  2 +-
 15 files changed, 94 insertions(+), 40 deletions(-)

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 5db48b9c340..5b452029f5c 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -44,6 +44,7 @@
 1. JDBC: Fix getting database name from sql statement context - 
[#34960](https://github.com/apache/shardingsphere/pull/34960)
 1. DistSQL: Fix duplicate result when show rules used storage unit with 
readwrite-splitting rule - 
[#35129](https://github.com/apache/shardingsphere/pull/35129)
 1. Transaction: Fix conflicting dependencies of BASE transaction integration 
module - [#35142](https://github.com/apache/shardingsphere/pull/35142)
+1. SQL Federation: Fix Operation not allowed after ResultSet closed exception 
when use sql federation - 
[#35206](https://github.com/apache/shardingsphere/pull/35206)
 
 ### Change Logs
 
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
index 463404ec0e3..00b8c6ed609 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java
@@ -94,18 +94,19 @@ public final class DriverExecutionPrepareEngine<T extends 
DriverExecutionUnit<?>
         List<C> connections = 
databaseConnectionManager.getConnections(databaseName, dataSourceName, 
connectionOffset, executionUnitGroups.size(), connectionMode);
         int count = 0;
         for (List<ExecutionUnit> each : executionUnitGroups) {
-            result.add(createExecutionGroup(dataSourceName, each, 
connections.get(count++), connectionMode));
+            result.add(createExecutionGroup(dataSourceName, each, 
connections.get(count++), connectionOffset, connectionMode));
         }
         return result;
     }
     
     @SuppressWarnings("unchecked")
-    private ExecutionGroup<T> createExecutionGroup(final String 
dataSourceName, final List<ExecutionUnit> executionUnits, final C connection, 
final ConnectionMode connectionMode) throws SQLException {
+    private ExecutionGroup<T> createExecutionGroup(final String 
dataSourceName, final List<ExecutionUnit> executionUnits, final C connection, 
final int connectionOffset,
+                                                   final ConnectionMode 
connectionMode) throws SQLException {
         List<T> inputs = new LinkedList<>();
         // TODO use metadata to replace storageUnits to support multiple logic 
databases
         DatabaseType databaseType = storageUnits.containsKey(dataSourceName) ? 
storageUnits.get(dataSourceName).getStorageType() : 
storageUnits.values().iterator().next().getStorageType();
         for (ExecutionUnit each : executionUnits) {
-            inputs.add((T) sqlExecutionUnitBuilder.build(each, 
statementManager, connection, connectionMode, option, databaseType));
+            inputs.add((T) sqlExecutionUnitBuilder.build(each, 
statementManager, connection, connectionOffset, connectionMode, option, 
databaseType));
         }
         return new ExecutionGroup<>(inputs);
     }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
index 247580cbfe7..8f522487ecd 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java
@@ -49,11 +49,12 @@ public interface ExecutorStatementManager<C, R, O extends 
StorageResourceOption>
      *
      * @param executionUnit execution unit
      * @param connection connection
+     * @param connectionOffset connection offset
      * @param connectionMode connection mode
      * @param option storage resource option
      * @param databaseType database type
      * @return storage resource
      * @throws SQLException SQL exception
      */
-    R createStorageResource(ExecutionUnit executionUnit, C connection, 
ConnectionMode connectionMode, O option, DatabaseType databaseType) throws 
SQLException;
+    R createStorageResource(ExecutionUnit executionUnit, C connection, int 
connectionOffset, ConnectionMode connectionMode, O option, DatabaseType 
databaseType) throws SQLException;
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
index 813f93cced6..8be0f7c6b83 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java
@@ -43,11 +43,12 @@ public interface SQLExecutionUnitBuilder<T extends 
DriverExecutionUnit<?>, M ext
      * @param executionUnit execution unit
      * @param executorManager executor manager 
      * @param connection connection
+     * @param connectionOffset connection offset
      * @param connectionMode connection mode
      * @param option storage resource option
      * @param databaseType database type
      * @return SQL execution unit
      * @throws SQLException SQL exception
      */
-    T build(ExecutionUnit executionUnit, M executorManager, C connection, 
ConnectionMode connectionMode, O option, DatabaseType databaseType) throws 
SQLException;
+    T build(ExecutionUnit executionUnit, M executorManager, C connection, int 
connectionOffset, ConnectionMode connectionMode, O option, DatabaseType 
databaseType) throws SQLException;
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
index 3f9ce8658d7..43b2f5690ba 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
@@ -35,16 +35,17 @@ import java.sql.SQLException;
 public final class PreparedStatementExecutionUnitBuilder implements 
JDBCExecutionUnitBuilder {
     
     @Override
-    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorJDBCStatementManager statementManager,
-                                   final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option, final DatabaseType 
databaseType) throws SQLException {
+    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorJDBCStatementManager statementManager, final Connection connection,
+                                   final int connectionOffset, final 
ConnectionMode connectionMode, final StatementOption option, final DatabaseType 
databaseType) throws SQLException {
         PreparedStatement preparedStatement = createPreparedStatement(
-                executionUnit, statementManager, connection, connectionMode, 
option, databaseType);
+                executionUnit, statementManager, connection, connectionOffset, 
connectionMode, option, databaseType);
         return new JDBCExecutionUnit(executionUnit, connectionMode, 
preparedStatement);
     }
     
     private PreparedStatement createPreparedStatement(final ExecutionUnit 
executionUnit, final ExecutorJDBCStatementManager statementManager, final 
Connection connection,
-                                                      final ConnectionMode 
connectionMode, final StatementOption option, final DatabaseType databaseType) 
throws SQLException {
-        return (PreparedStatement) 
statementManager.createStorageResource(executionUnit, connection, 
connectionMode, option, databaseType);
+                                                      final int 
connectionOffset, final ConnectionMode connectionMode, final StatementOption 
option,
+                                                      final DatabaseType 
databaseType) throws SQLException {
+        return (PreparedStatement) 
statementManager.createStorageResource(executionUnit, connection, 
connectionOffset, connectionMode, option, databaseType);
     }
     
     @Override
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
index 99df2035b57..ef6b3529d58 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java
@@ -35,8 +35,8 @@ import java.sql.Statement;
 public final class StatementExecutionUnitBuilder implements 
JDBCExecutionUnitBuilder {
     
     @Override
-    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorJDBCStatementManager statementManager,
-                                   final Connection connection, final 
ConnectionMode connectionMode, final StatementOption option, final DatabaseType 
databaseType) throws SQLException {
+    public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final 
ExecutorJDBCStatementManager statementManager, final Connection connection,
+                                   final int connectionOffset, final 
ConnectionMode connectionMode, final StatementOption option, final DatabaseType 
databaseType) throws SQLException {
         return new JDBCExecutionUnit(executionUnit, connectionMode, 
createStatement(statementManager, connection, connectionMode, option, 
databaseType));
     }
     
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
index 58b15d456db..185cd6bdf9b 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
@@ -50,9 +50,9 @@ public final class StatementManager implements 
ExecutorJDBCStatementManager, Aut
     }
     
     @Override
-    public Statement createStorageResource(final ExecutionUnit executionUnit, 
final Connection connection, final ConnectionMode connectionMode, final 
StatementOption option,
-                                           final DatabaseType databaseType) 
throws SQLException {
-        CacheKey cacheKey = new CacheKey(executionUnit, connectionMode);
+    public Statement createStorageResource(final ExecutionUnit executionUnit, 
final Connection connection, final int connectionOffset,
+                                           final ConnectionMode 
connectionMode, final StatementOption option, final DatabaseType databaseType) 
throws SQLException {
+        CacheKey cacheKey = new CacheKey(executionUnit, connectionMode, 
connectionOffset);
         Statement result = cachedStatements.get(cacheKey);
         if (null == result || result.getConnection().isClosed() || 
result.isClosed()) {
             Optional.ofNullable(result).ifPresent(optional -> 
cachedStatements.remove(cacheKey));
@@ -113,5 +113,7 @@ public final class StatementManager implements 
ExecutorJDBCStatementManager, Aut
         private final ExecutionUnit executionUnit;
         
         private final ConnectionMode connectionMode;
+        
+        private final int connectionOffset;
     }
 }
diff --git 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
index f5f6a460b01..dcf54e7e01b 100644
--- 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
@@ -95,13 +95,13 @@ public final class SQLFederationEngine implements 
AutoCloseable {
     
     private final String currentSchemaName;
     
-    private final ShardingSphereMetaData metaData;
+    private final SQLFederationRule sqlFederationRule;
     
-    private final ShardingSphereStatistics statistics;
+    private final SQLFederationProcessor processor;
     
-    private final JDBCExecutor jdbcExecutor;
+    private QueryContext queryContext;
     
-    private final SQLFederationRule sqlFederationRule;
+    private SchemaPlus schemaPlus;
     
     private ResultSet resultSet;
     
@@ -110,10 +110,8 @@ public final class SQLFederationEngine implements 
AutoCloseable {
         deciders = OrderedSPILoader.getServices(SQLFederationDecider.class, 
metaData.getDatabase(currentDatabaseName).getRuleMetaData().getRules());
         this.currentDatabaseName = currentDatabaseName;
         this.currentSchemaName = currentSchemaName;
-        this.metaData = metaData;
-        this.statistics = statistics;
-        this.jdbcExecutor = jdbcExecutor;
         sqlFederationRule = 
metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class);
+        processor = 
SQLFederationProcessorFactory.getInstance().newInstance(metaData, statistics, 
jdbcExecutor);
     }
     
     /**
@@ -184,18 +182,17 @@ public final class SQLFederationEngine implements 
AutoCloseable {
      */
     public ResultSet executeQuery(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final JDBCExecutorCallback<? extends ExecuteResult> callback,
                                   final SQLFederationContext 
federationContext) {
-        QueryContext queryContext = federationContext.getQueryContext();
+        queryContext = federationContext.getQueryContext();
         try {
             
ShardingSpherePreconditions.checkState(queryContext.getSqlStatementContext() 
instanceof SelectStatementContext,
                     () -> new IllegalArgumentException("SQL statement must be 
select statement in sql federation engine."));
             SelectStatementContext selectStatementContext = 
(SelectStatementContext) queryContext.getSqlStatementContext();
             String databaseName = 
selectStatementContext.getTablesContext().getDatabaseNames().stream().findFirst().orElse(currentDatabaseName);
             String schemaName = 
selectStatementContext.getTablesContext().getSchemaName().orElse(currentSchemaName);
-            SQLFederationProcessor processor = 
SQLFederationProcessorFactory.getInstance().newInstance(metaData, statistics, 
jdbcExecutor);
             SqlToRelConverter converter = 
creeateSQLToRelConverter(databaseName, schemaName, 
selectStatementContext.getDatabaseType(), processor.getConvention());
             SQLFederationExecutionPlan executionPlan = compileQuery(converter, 
databaseName, schemaName,
                     federationContext.getMetaData(), selectStatementContext, 
queryContext.getSql(), processor.getConvention());
-            SchemaPlus schemaPlus = getSqlFederationSchema(converter, 
schemaName, queryContext.getSql());
+            schemaPlus = getSqlFederationSchema(converter, schemaName, 
queryContext.getSql());
             processor.registerExecutor(prepareEngine, callback, databaseName, 
schemaName, federationContext, sqlFederationRule.getOptimizerContext(), 
schemaPlus);
             resultSet = processor.executePlan(prepareEngine, callback, 
executionPlan, converter, federationContext, schemaPlus);
             return resultSet;
@@ -249,6 +246,7 @@ public final class SQLFederationEngine implements 
AutoCloseable {
     public void close() throws SQLException {
         Collection<SQLException> result = new LinkedList<>();
         closeResultSet().ifPresent(result::add);
+        unregisterExecutor();
         if (result.isEmpty()) {
             return;
         }
@@ -267,4 +265,10 @@ public final class SQLFederationEngine implements 
AutoCloseable {
         }
         return Optional.empty();
     }
+    
+    private void unregisterExecutor() {
+        if (null != queryContext && null != schemaPlus) {
+            processor.unregisterExecutor(queryContext, schemaPlus);
+        }
+    }
 }
diff --git 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java
index b93ea2a055e..680d261cfd3 100644
--- 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
 import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
 import 
org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationExecutionPlan;
 import 
org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
@@ -51,6 +52,15 @@ public interface SQLFederationProcessor {
                                   String databaseName, String schemaName, 
SQLFederationContext federationContext, OptimizerContext optimizerContext, 
SchemaPlus schemaPlus) {
     }
     
+    /**
+     * Unregister executor.
+     *
+     * @param queryContext query context
+     * @param schemaPlus sql federation schema
+     */
+    default void unregisterExecutor(QueryContext queryContext, SchemaPlus 
schemaPlus) {
+    }
+    
     /**
      * Execute plan.
      *
diff --git 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java
 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java
index 90463b4100d..d3938d715d5 100644
--- 
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java
+++ 
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java
@@ -29,14 +29,16 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql2rel.SqlToRelConverter;
 import 
org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.binder.context.type.TableAvailable;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 import 
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
+import 
org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment;
 import 
org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor;
 import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext;
 import 
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
@@ -49,6 +51,7 @@ import 
org.apache.shardingsphere.sqlfederation.resultset.SQLFederationResultSet;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -76,15 +79,30 @@ public final class StandardSQLFederationProcessor 
implements SQLFederationProces
         SQLFederationExecutorContext executorContext = new 
SQLFederationExecutorContext(databaseName, schemaName, metaData.getProps());
         EnumerableScanExecutor scanExecutor =
                 new EnumerableScanExecutor(prepareEngine, jdbcExecutor, 
callback, optimizerContext, executorContext, federationContext, 
metaData.getGlobalRuleMetaData(), statistics);
-        // TODO register only the required tables
-        for (ShardingSphereTable each : 
metaData.getDatabase(databaseName).getSchema(schemaName).getAllTables()) {
-            Table table = schemaPlus.tables().get(each.getName());
+        Collection<SimpleTableSegment> simpleTables = 
federationContext.getQueryContext().getSqlStatementContext() instanceof 
TableAvailable
+                ? ((TableAvailable) 
federationContext.getQueryContext().getSqlStatementContext()).getTablesContext().getSimpleTables()
+                : Collections.emptyList();
+        for (SimpleTableSegment each : simpleTables) {
+            Table table = 
schemaPlus.tables().get(each.getTableName().getIdentifier().getValue());
             if (table instanceof SQLFederationTable) {
                 ((SQLFederationTable) table).setScanExecutor(scanExecutor);
             }
         }
     }
     
+    @Override
+    public void unregisterExecutor(final QueryContext queryContext, final 
SchemaPlus schemaPlus) {
+        Collection<SimpleTableSegment> simpleTables = 
queryContext.getSqlStatementContext() instanceof TableAvailable
+                ? ((TableAvailable) 
queryContext.getSqlStatementContext()).getTablesContext().getSimpleTables()
+                : Collections.emptyList();
+        for (SimpleTableSegment each : simpleTables) {
+            Table table = 
schemaPlus.tables().get(each.getTableName().getIdentifier().getValue());
+            if (table instanceof SQLFederationTable) {
+                ((SQLFederationTable) table).clearScanExecutor();
+            }
+        }
+    }
+    
     @SuppressWarnings("unchecked")
     @Override
     public ResultSet executePlan(final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, 
final JDBCExecutorCallback<? extends ExecuteResult> callback,
diff --git 
a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java
 
b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java
index 760adcca8f3..20f84cd324f 100644
--- 
a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java
+++ 
b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema;
 
+import com.alibaba.ttl.TransmittableThreadLocal;
 import lombok.RequiredArgsConstructor;
-import lombok.Setter;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.AbstractEnumerable;
 import org.apache.calcite.linq4j.Enumerable;
@@ -64,15 +64,14 @@ import java.util.List;
 @RequiredArgsConstructor
 public final class SQLFederationTable extends AbstractTable implements 
ModifiableTable, TranslatableTable {
     
+    private static final TransmittableThreadLocal<ScanExecutor> 
SCAN_EXECUTOR_HOLDER = new TransmittableThreadLocal<>();
+    
     private final ShardingSphereTable table;
     
     private final SQLFederationStatistic statistic;
     
     private final DatabaseType protocolType;
     
-    @Setter
-    private ScanExecutor scanExecutor;
-    
     @Override
     public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
         return SQLFederationDataTypeUtils.createRelDataType(table, 
protocolType, typeFactory);
@@ -107,10 +106,10 @@ public final class SQLFederationTable extends 
AbstractTable implements Modifiabl
      * @return enumerable result
      */
     public Enumerable<Object> execute(final DataContext root, final String 
sql, final int[] paramIndexes) {
-        if (null == scanExecutor) {
+        if (null == SCAN_EXECUTOR_HOLDER.get()) {
             return createEmptyEnumerable();
         }
-        return scanExecutor.execute(table, new ScanExecutorContext(root, sql, 
paramIndexes));
+        return SCAN_EXECUTOR_HOLDER.get().execute(table, new 
ScanExecutorContext(root, sql, paramIndexes));
     }
     
     private AbstractEnumerable<Object> createEmptyEnumerable() {
@@ -144,4 +143,20 @@ public final class SQLFederationTable extends 
AbstractTable implements Modifiabl
                                          final List<RexNode> 
sourceExpressionList, final boolean flattened) {
         return LogicalTableModify.create(table, schema, relNode, operation, 
updateColumnList, sourceExpressionList, flattened);
     }
+    
+    /**
+     * Set scan executor.
+     *
+     * @param scanExecutor scan executor
+     */
+    public void setScanExecutor(final ScanExecutor scanExecutor) {
+        SCAN_EXECUTOR_HOLDER.set(scanExecutor);
+    }
+    
+    /**
+     * Clear scan executor.
+     */
+    public void clearScanExecutor() {
+        SCAN_EXECUTOR_HOLDER.remove();
+    }
 }
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
index 030e7b591ca..ea1b23c50c0 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java
@@ -49,8 +49,8 @@ public final class JDBCBackendStatement implements 
ExecutorJDBCStatementManager
     }
     
     @Override
-    public Statement createStorageResource(final ExecutionUnit executionUnit, 
final Connection connection, final ConnectionMode connectionMode, final 
StatementOption option,
-                                           final DatabaseType databaseType) 
throws SQLException {
+    public Statement createStorageResource(final ExecutionUnit executionUnit, 
final Connection connection, final int connectionOffset,
+                                           final ConnectionMode 
connectionMode, final StatementOption option, final DatabaseType databaseType) 
throws SQLException {
         String sql = executionUnit.getSqlUnit().getSql();
         List<Object> params = executionUnit.getSqlUnit().getParameters();
         PreparedStatement result = option.isReturnGeneratedKeys()
diff --git 
a/proxy/frontend/type/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
 
b/proxy/frontend/type/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
index ea5629c58c3..d42a27fd8fd 100644
--- 
a/proxy/frontend/type/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
+++ 
b/proxy/frontend/type/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java
@@ -113,7 +113,7 @@ class OpenGaussComBatchBindExecutorTest {
         when(databaseConnectionManager.getConnections(any(), 
nullable(String.class), anyInt(), anyInt(), 
any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection));
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
         JDBCBackendStatement backendStatement = 
mock(JDBCBackendStatement.class);
-        when(backendStatement.createStorageResource(any(ExecutionUnit.class), 
any(Connection.class), any(ConnectionMode.class), any(StatementOption.class), 
nullable(DatabaseType.class)))
+        when(backendStatement.createStorageResource(any(ExecutionUnit.class), 
any(Connection.class), anyInt(), any(ConnectionMode.class), 
any(StatementOption.class), nullable(DatabaseType.class)))
                 .thenReturn(preparedStatement);
         when(result.getStatementManager()).thenReturn(backendStatement);
         
when(result.getDatabaseConnectionManager()).thenReturn(databaseConnectionManager);
diff --git 
a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
 
b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
index d2f3445f4bd..d812604b60e 100644
--- 
a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
+++ 
b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java
@@ -129,7 +129,7 @@ class 
PostgreSQLAggregatedBatchedStatementsCommandExecutorTest {
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
         when(preparedStatement.getConnection()).thenReturn(connection);
         JDBCBackendStatement backendStatement = 
mock(JDBCBackendStatement.class);
-        when(backendStatement.createStorageResource(any(ExecutionUnit.class), 
any(Connection.class), any(ConnectionMode.class), any(StatementOption.class), 
nullable(DatabaseType.class)))
+        when(backendStatement.createStorageResource(any(ExecutionUnit.class), 
any(Connection.class), anyInt(), any(ConnectionMode.class), 
any(StatementOption.class), nullable(DatabaseType.class)))
                 .thenReturn(preparedStatement);
         when(result.getStatementManager()).thenReturn(backendStatement);
         
when(result.getDatabaseConnectionManager()).thenReturn(databaseConnectionManager);
diff --git 
a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
 
b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
index 5d7f1aa257b..cf30cd5c0b5 100644
--- 
a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
+++ 
b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java
@@ -96,7 +96,7 @@ class PostgreSQLBatchedStatementsExecutorTest {
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
         when(preparedStatement.getConnection()).thenReturn(connection);
         when(preparedStatement.executeBatch()).thenReturn(new int[]{1, 1, 1});
-        when(backendStatement.createStorageResource(any(ExecutionUnit.class), 
eq(connection), any(ConnectionMode.class), any(StatementOption.class), 
nullable(DatabaseType.class)))
+        when(backendStatement.createStorageResource(any(ExecutionUnit.class), 
eq(connection), anyInt(), any(ConnectionMode.class), 
any(StatementOption.class), nullable(DatabaseType.class)))
                 .thenReturn(preparedStatement);
         ContextManager contextManager = mockContextManager();
         ConnectionSession connectionSession = mockConnectionSession();

Reply via email to