This is an automated email from the ASF dual-hosted git repository. zhaojinchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 8c0783fd074 Fix Operation not allowed after ResultSet closed exception when use sql federation (#35206) 8c0783fd074 is described below commit 8c0783fd0746ea7346cf01bde338ac59f0c553ff Author: Zhengqiang Duan <duanzhengqi...@apache.org> AuthorDate: Thu Apr 17 15:18:45 2025 +0800 Fix Operation not allowed after ResultSet closed exception when use sql federation (#35206) --- RELEASE-NOTES.md | 1 + .../driver/DriverExecutionPrepareEngine.java | 7 +++--- .../prepare/driver/ExecutorStatementManager.java | 3 ++- .../prepare/driver/SQLExecutionUnitBuilder.java | 3 ++- .../PreparedStatementExecutionUnitBuilder.java | 11 +++++---- .../builder/StatementExecutionUnitBuilder.java | 4 ++-- .../jdbc/core/statement/StatementManager.java | 8 ++++--- .../sqlfederation/engine/SQLFederationEngine.java | 24 +++++++++++-------- .../engine/processor/SQLFederationProcessor.java | 10 ++++++++ .../impl/StandardSQLFederationProcessor.java | 26 +++++++++++++++++---- .../metadata/schema/SQLFederationTable.java | 27 +++++++++++++++++----- .../jdbc/statement/JDBCBackendStatement.java | 4 ++-- .../bind/OpenGaussComBatchBindExecutorTest.java | 2 +- ...egatedBatchedStatementsCommandExecutorTest.java | 2 +- .../PostgreSQLBatchedStatementsExecutorTest.java | 2 +- 15 files changed, 94 insertions(+), 40 deletions(-) diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 5db48b9c340..5b452029f5c 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -44,6 +44,7 @@ 1. JDBC: Fix getting database name from sql statement context - [#34960](https://github.com/apache/shardingsphere/pull/34960) 1. DistSQL: Fix duplicate result when show rules used storage unit with readwrite-splitting rule - [#35129](https://github.com/apache/shardingsphere/pull/35129) 1. Transaction: Fix conflicting dependencies of BASE transaction integration module - [#35142](https://github.com/apache/shardingsphere/pull/35142) +1. SQL Federation: Fix Operation not allowed after ResultSet closed exception when use sql federation - [#35206](https://github.com/apache/shardingsphere/pull/35206) ### Change Logs diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java index 463404ec0e3..00b8c6ed609 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/DriverExecutionPrepareEngine.java @@ -94,18 +94,19 @@ public final class DriverExecutionPrepareEngine<T extends DriverExecutionUnit<?> List<C> connections = databaseConnectionManager.getConnections(databaseName, dataSourceName, connectionOffset, executionUnitGroups.size(), connectionMode); int count = 0; for (List<ExecutionUnit> each : executionUnitGroups) { - result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode)); + result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionOffset, connectionMode)); } return result; } @SuppressWarnings("unchecked") - private ExecutionGroup<T> createExecutionGroup(final String dataSourceName, final List<ExecutionUnit> executionUnits, final C connection, final ConnectionMode connectionMode) throws SQLException { + private ExecutionGroup<T> createExecutionGroup(final String dataSourceName, final List<ExecutionUnit> executionUnits, final C connection, final int connectionOffset, + final ConnectionMode connectionMode) throws SQLException { List<T> inputs = new LinkedList<>(); // TODO use metadata to replace storageUnits to support multiple logic databases DatabaseType databaseType = storageUnits.containsKey(dataSourceName) ? storageUnits.get(dataSourceName).getStorageType() : storageUnits.values().iterator().next().getStorageType(); for (ExecutionUnit each : executionUnits) { - inputs.add((T) sqlExecutionUnitBuilder.build(each, statementManager, connection, connectionMode, option, databaseType)); + inputs.add((T) sqlExecutionUnitBuilder.build(each, statementManager, connection, connectionOffset, connectionMode, option, databaseType)); } return new ExecutionGroup<>(inputs); } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java index 247580cbfe7..8f522487ecd 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/ExecutorStatementManager.java @@ -49,11 +49,12 @@ public interface ExecutorStatementManager<C, R, O extends StorageResourceOption> * * @param executionUnit execution unit * @param connection connection + * @param connectionOffset connection offset * @param connectionMode connection mode * @param option storage resource option * @param databaseType database type * @return storage resource * @throws SQLException SQL exception */ - R createStorageResource(ExecutionUnit executionUnit, C connection, ConnectionMode connectionMode, O option, DatabaseType databaseType) throws SQLException; + R createStorageResource(ExecutionUnit executionUnit, C connection, int connectionOffset, ConnectionMode connectionMode, O option, DatabaseType databaseType) throws SQLException; } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java index 813f93cced6..8be0f7c6b83 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/SQLExecutionUnitBuilder.java @@ -43,11 +43,12 @@ public interface SQLExecutionUnitBuilder<T extends DriverExecutionUnit<?>, M ext * @param executionUnit execution unit * @param executorManager executor manager * @param connection connection + * @param connectionOffset connection offset * @param connectionMode connection mode * @param option storage resource option * @param databaseType database type * @return SQL execution unit * @throws SQLException SQL exception */ - T build(ExecutionUnit executionUnit, M executorManager, C connection, ConnectionMode connectionMode, O option, DatabaseType databaseType) throws SQLException; + T build(ExecutionUnit executionUnit, M executorManager, C connection, int connectionOffset, ConnectionMode connectionMode, O option, DatabaseType databaseType) throws SQLException; } diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java index 3f9ce8658d7..43b2f5690ba 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java @@ -35,16 +35,17 @@ import java.sql.SQLException; public final class PreparedStatementExecutionUnitBuilder implements JDBCExecutionUnitBuilder { @Override - public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, - final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { + public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection, + final int connectionOffset, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { PreparedStatement preparedStatement = createPreparedStatement( - executionUnit, statementManager, connection, connectionMode, option, databaseType); + executionUnit, statementManager, connection, connectionOffset, connectionMode, option, databaseType); return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement); } private PreparedStatement createPreparedStatement(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection, - final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { - return (PreparedStatement) statementManager.createStorageResource(executionUnit, connection, connectionMode, option, databaseType); + final int connectionOffset, final ConnectionMode connectionMode, final StatementOption option, + final DatabaseType databaseType) throws SQLException { + return (PreparedStatement) statementManager.createStorageResource(executionUnit, connection, connectionOffset, connectionMode, option, databaseType); } @Override diff --git a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java index 99df2035b57..ef6b3529d58 100644 --- a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java +++ b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/StatementExecutionUnitBuilder.java @@ -35,8 +35,8 @@ import java.sql.Statement; public final class StatementExecutionUnitBuilder implements JDBCExecutionUnitBuilder { @Override - public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, - final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { + public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection, + final int connectionOffset, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { return new JDBCExecutionUnit(executionUnit, connectionMode, createStatement(statementManager, connection, connectionMode, option, databaseType)); } diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java index 58b15d456db..185cd6bdf9b 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java @@ -50,9 +50,9 @@ public final class StatementManager implements ExecutorJDBCStatementManager, Aut } @Override - public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final ConnectionMode connectionMode, final StatementOption option, - final DatabaseType databaseType) throws SQLException { - CacheKey cacheKey = new CacheKey(executionUnit, connectionMode); + public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final int connectionOffset, + final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { + CacheKey cacheKey = new CacheKey(executionUnit, connectionMode, connectionOffset); Statement result = cachedStatements.get(cacheKey); if (null == result || result.getConnection().isClosed() || result.isClosed()) { Optional.ofNullable(result).ifPresent(optional -> cachedStatements.remove(cacheKey)); @@ -113,5 +113,7 @@ public final class StatementManager implements ExecutorJDBCStatementManager, Aut private final ExecutionUnit executionUnit; private final ConnectionMode connectionMode; + + private final int connectionOffset; } } diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java index f5f6a460b01..dcf54e7e01b 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java @@ -95,13 +95,13 @@ public final class SQLFederationEngine implements AutoCloseable { private final String currentSchemaName; - private final ShardingSphereMetaData metaData; + private final SQLFederationRule sqlFederationRule; - private final ShardingSphereStatistics statistics; + private final SQLFederationProcessor processor; - private final JDBCExecutor jdbcExecutor; + private QueryContext queryContext; - private final SQLFederationRule sqlFederationRule; + private SchemaPlus schemaPlus; private ResultSet resultSet; @@ -110,10 +110,8 @@ public final class SQLFederationEngine implements AutoCloseable { deciders = OrderedSPILoader.getServices(SQLFederationDecider.class, metaData.getDatabase(currentDatabaseName).getRuleMetaData().getRules()); this.currentDatabaseName = currentDatabaseName; this.currentSchemaName = currentSchemaName; - this.metaData = metaData; - this.statistics = statistics; - this.jdbcExecutor = jdbcExecutor; sqlFederationRule = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class); + processor = SQLFederationProcessorFactory.getInstance().newInstance(metaData, statistics, jdbcExecutor); } /** @@ -184,18 +182,17 @@ public final class SQLFederationEngine implements AutoCloseable { */ public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext) { - QueryContext queryContext = federationContext.getQueryContext(); + queryContext = federationContext.getQueryContext(); try { ShardingSpherePreconditions.checkState(queryContext.getSqlStatementContext() instanceof SelectStatementContext, () -> new IllegalArgumentException("SQL statement must be select statement in sql federation engine.")); SelectStatementContext selectStatementContext = (SelectStatementContext) queryContext.getSqlStatementContext(); String databaseName = selectStatementContext.getTablesContext().getDatabaseNames().stream().findFirst().orElse(currentDatabaseName); String schemaName = selectStatementContext.getTablesContext().getSchemaName().orElse(currentSchemaName); - SQLFederationProcessor processor = SQLFederationProcessorFactory.getInstance().newInstance(metaData, statistics, jdbcExecutor); SqlToRelConverter converter = creeateSQLToRelConverter(databaseName, schemaName, selectStatementContext.getDatabaseType(), processor.getConvention()); SQLFederationExecutionPlan executionPlan = compileQuery(converter, databaseName, schemaName, federationContext.getMetaData(), selectStatementContext, queryContext.getSql(), processor.getConvention()); - SchemaPlus schemaPlus = getSqlFederationSchema(converter, schemaName, queryContext.getSql()); + schemaPlus = getSqlFederationSchema(converter, schemaName, queryContext.getSql()); processor.registerExecutor(prepareEngine, callback, databaseName, schemaName, federationContext, sqlFederationRule.getOptimizerContext(), schemaPlus); resultSet = processor.executePlan(prepareEngine, callback, executionPlan, converter, federationContext, schemaPlus); return resultSet; @@ -249,6 +246,7 @@ public final class SQLFederationEngine implements AutoCloseable { public void close() throws SQLException { Collection<SQLException> result = new LinkedList<>(); closeResultSet().ifPresent(result::add); + unregisterExecutor(); if (result.isEmpty()) { return; } @@ -267,4 +265,10 @@ public final class SQLFederationEngine implements AutoCloseable { } return Optional.empty(); } + + private void unregisterExecutor() { + if (null != queryContext && null != schemaPlus) { + processor.unregisterExecutor(queryContext, schemaPlus); + } + } } diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java index b93ea2a055e..680d261cfd3 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/SQLFederationProcessor.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; +import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationExecutionPlan; import org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext; @@ -51,6 +52,15 @@ public interface SQLFederationProcessor { String databaseName, String schemaName, SQLFederationContext federationContext, OptimizerContext optimizerContext, SchemaPlus schemaPlus) { } + /** + * Unregister executor. + * + * @param queryContext query context + * @param schemaPlus sql federation schema + */ + default void unregisterExecutor(QueryContext queryContext, SchemaPlus schemaPlus) { + } + /** * Execute plan. * diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java index 90463b4100d..d3938d715d5 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/processor/impl/StandardSQLFederationProcessor.java @@ -29,14 +29,16 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.Table; import org.apache.calcite.sql2rel.SqlToRelConverter; import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext; +import org.apache.shardingsphere.infra.binder.context.type.TableAvailable; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; -import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics; +import org.apache.shardingsphere.infra.session.query.QueryContext; +import org.apache.shardingsphere.sql.parser.statement.core.segment.generic.table.SimpleTableSegment; import org.apache.shardingsphere.sqlfederation.engine.processor.SQLFederationProcessor; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; @@ -49,6 +51,7 @@ import org.apache.shardingsphere.sqlfederation.resultset.SQLFederationResultSet; import java.sql.Connection; import java.sql.ResultSet; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -76,15 +79,30 @@ public final class StandardSQLFederationProcessor implements SQLFederationProces SQLFederationExecutorContext executorContext = new SQLFederationExecutorContext(databaseName, schemaName, metaData.getProps()); EnumerableScanExecutor scanExecutor = new EnumerableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, executorContext, federationContext, metaData.getGlobalRuleMetaData(), statistics); - // TODO register only the required tables - for (ShardingSphereTable each : metaData.getDatabase(databaseName).getSchema(schemaName).getAllTables()) { - Table table = schemaPlus.tables().get(each.getName()); + Collection<SimpleTableSegment> simpleTables = federationContext.getQueryContext().getSqlStatementContext() instanceof TableAvailable + ? ((TableAvailable) federationContext.getQueryContext().getSqlStatementContext()).getTablesContext().getSimpleTables() + : Collections.emptyList(); + for (SimpleTableSegment each : simpleTables) { + Table table = schemaPlus.tables().get(each.getTableName().getIdentifier().getValue()); if (table instanceof SQLFederationTable) { ((SQLFederationTable) table).setScanExecutor(scanExecutor); } } } + @Override + public void unregisterExecutor(final QueryContext queryContext, final SchemaPlus schemaPlus) { + Collection<SimpleTableSegment> simpleTables = queryContext.getSqlStatementContext() instanceof TableAvailable + ? ((TableAvailable) queryContext.getSqlStatementContext()).getTablesContext().getSimpleTables() + : Collections.emptyList(); + for (SimpleTableSegment each : simpleTables) { + Table table = schemaPlus.tables().get(each.getTableName().getIdentifier().getValue()); + if (table instanceof SQLFederationTable) { + ((SQLFederationTable) table).clearScanExecutor(); + } + } + } + @SuppressWarnings("unchecked") @Override public ResultSet executePlan(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback, diff --git a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java index 760adcca8f3..20f84cd324f 100644 --- a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java +++ b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java @@ -17,8 +17,8 @@ package org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema; +import com.alibaba.ttl.TransmittableThreadLocal; import lombok.RequiredArgsConstructor; -import lombok.Setter; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; @@ -64,15 +64,14 @@ import java.util.List; @RequiredArgsConstructor public final class SQLFederationTable extends AbstractTable implements ModifiableTable, TranslatableTable { + private static final TransmittableThreadLocal<ScanExecutor> SCAN_EXECUTOR_HOLDER = new TransmittableThreadLocal<>(); + private final ShardingSphereTable table; private final SQLFederationStatistic statistic; private final DatabaseType protocolType; - @Setter - private ScanExecutor scanExecutor; - @Override public RelDataType getRowType(final RelDataTypeFactory typeFactory) { return SQLFederationDataTypeUtils.createRelDataType(table, protocolType, typeFactory); @@ -107,10 +106,10 @@ public final class SQLFederationTable extends AbstractTable implements Modifiabl * @return enumerable result */ public Enumerable<Object> execute(final DataContext root, final String sql, final int[] paramIndexes) { - if (null == scanExecutor) { + if (null == SCAN_EXECUTOR_HOLDER.get()) { return createEmptyEnumerable(); } - return scanExecutor.execute(table, new ScanExecutorContext(root, sql, paramIndexes)); + return SCAN_EXECUTOR_HOLDER.get().execute(table, new ScanExecutorContext(root, sql, paramIndexes)); } private AbstractEnumerable<Object> createEmptyEnumerable() { @@ -144,4 +143,20 @@ public final class SQLFederationTable extends AbstractTable implements Modifiabl final List<RexNode> sourceExpressionList, final boolean flattened) { return LogicalTableModify.create(table, schema, relNode, operation, updateColumnList, sourceExpressionList, flattened); } + + /** + * Set scan executor. + * + * @param scanExecutor scan executor + */ + public void setScanExecutor(final ScanExecutor scanExecutor) { + SCAN_EXECUTOR_HOLDER.set(scanExecutor); + } + + /** + * Clear scan executor. + */ + public void clearScanExecutor() { + SCAN_EXECUTOR_HOLDER.remove(); + } } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java index 030e7b591ca..ea1b23c50c0 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/statement/JDBCBackendStatement.java @@ -49,8 +49,8 @@ public final class JDBCBackendStatement implements ExecutorJDBCStatementManager } @Override - public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final ConnectionMode connectionMode, final StatementOption option, - final DatabaseType databaseType) throws SQLException { + public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final int connectionOffset, + final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { String sql = executionUnit.getSqlUnit().getSql(); List<Object> params = executionUnit.getSqlUnit().getParameters(); PreparedStatement result = option.isReturnGeneratedKeys() diff --git a/proxy/frontend/type/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java b/proxy/frontend/type/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java index ea5629c58c3..d42a27fd8fd 100644 --- a/proxy/frontend/type/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java +++ b/proxy/frontend/type/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java @@ -113,7 +113,7 @@ class OpenGaussComBatchBindExecutorTest { when(databaseConnectionManager.getConnections(any(), nullable(String.class), anyInt(), anyInt(), any(ConnectionMode.class))).thenReturn(Collections.singletonList(connection)); PreparedStatement preparedStatement = mock(PreparedStatement.class); JDBCBackendStatement backendStatement = mock(JDBCBackendStatement.class); - when(backendStatement.createStorageResource(any(ExecutionUnit.class), any(Connection.class), any(ConnectionMode.class), any(StatementOption.class), nullable(DatabaseType.class))) + when(backendStatement.createStorageResource(any(ExecutionUnit.class), any(Connection.class), anyInt(), any(ConnectionMode.class), any(StatementOption.class), nullable(DatabaseType.class))) .thenReturn(preparedStatement); when(result.getStatementManager()).thenReturn(backendStatement); when(result.getDatabaseConnectionManager()).thenReturn(databaseConnectionManager); diff --git a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java index d2f3445f4bd..d812604b60e 100644 --- a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java +++ b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLAggregatedBatchedStatementsCommandExecutorTest.java @@ -129,7 +129,7 @@ class PostgreSQLAggregatedBatchedStatementsCommandExecutorTest { PreparedStatement preparedStatement = mock(PreparedStatement.class); when(preparedStatement.getConnection()).thenReturn(connection); JDBCBackendStatement backendStatement = mock(JDBCBackendStatement.class); - when(backendStatement.createStorageResource(any(ExecutionUnit.class), any(Connection.class), any(ConnectionMode.class), any(StatementOption.class), nullable(DatabaseType.class))) + when(backendStatement.createStorageResource(any(ExecutionUnit.class), any(Connection.class), anyInt(), any(ConnectionMode.class), any(StatementOption.class), nullable(DatabaseType.class))) .thenReturn(preparedStatement); when(result.getStatementManager()).thenReturn(backendStatement); when(result.getDatabaseConnectionManager()).thenReturn(databaseConnectionManager); diff --git a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java index 5d7f1aa257b..cf30cd5c0b5 100644 --- a/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java +++ b/proxy/frontend/type/postgresql/src/test/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/extended/PostgreSQLBatchedStatementsExecutorTest.java @@ -96,7 +96,7 @@ class PostgreSQLBatchedStatementsExecutorTest { PreparedStatement preparedStatement = mock(PreparedStatement.class); when(preparedStatement.getConnection()).thenReturn(connection); when(preparedStatement.executeBatch()).thenReturn(new int[]{1, 1, 1}); - when(backendStatement.createStorageResource(any(ExecutionUnit.class), eq(connection), any(ConnectionMode.class), any(StatementOption.class), nullable(DatabaseType.class))) + when(backendStatement.createStorageResource(any(ExecutionUnit.class), eq(connection), anyInt(), any(ConnectionMode.class), any(StatementOption.class), nullable(DatabaseType.class))) .thenReturn(preparedStatement); ContextManager contextManager = mockContextManager(); ConnectionSession connectionSession = mockConnectionSession();