This is an automated email from the ASF dual-hosted git repository. chengzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 26c7ddcccb5 Add SQLFederationEngine interface to standardize sql federation query engine calls (#35039) 26c7ddcccb5 is described below commit 26c7ddcccb5dc6a8f0701a7c11fae93d57b55f7a Author: Zhengqiang Duan <duanzhengqi...@apache.org> AuthorDate: Thu Mar 20 12:50:42 2025 +0800 Add SQLFederationEngine interface to standardize sql federation query engine calls (#35039) --- .../executor/engine/DriverExecuteExecutor.java | 2 +- .../engine/facade/DriverExecutorFacade.java | 5 +- .../sqlfederation/engine/SQLFederationEngine.java | 251 ++------------------- .../engine/SQLFederationEngineFactory.java | 58 +++++ .../StandardSQLFederationEngine.java} | 39 +--- ...t.java => StandardSQLFederationEngineTest.java} | 21 +- .../proxy/backend/connector/ProxySQLExecutor.java | 4 +- .../connector/StandardDatabaseConnector.java | 2 +- .../handler/distsql/rul/PreviewExecutor.java | 4 +- .../OpenGaussSystemCatalogAdminQueryExecutor.java | 5 +- 10 files changed, 114 insertions(+), 277 deletions(-) diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java index 11aa2b768aa..283119a189f 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java @@ -103,7 +103,7 @@ public final class DriverExecuteExecutor { } FederationMetaDataRefreshEngine federationMetaDataRefreshEngine = new FederationMetaDataRefreshEngine( connection.getContextManager().getPersistServiceFacade().getModeFacade().getMetaDataManagerService(), database); - if (sqlFederationEngine.enabled() && federationMetaDataRefreshEngine.isNeedRefresh(queryContext.getSqlStatementContext())) { + if (sqlFederationEngine.isSqlFederationEnabled() && federationMetaDataRefreshEngine.isNeedRefresh(queryContext.getSqlStatementContext())) { federationMetaDataRefreshEngine.refresh(queryContext.getSqlStatementContext()); return true; } diff --git a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/facade/DriverExecutorFacade.java b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/facade/DriverExecutorFacade.java index 3a193e56273..477a371acf2 100644 --- a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/facade/DriverExecutorFacade.java +++ b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/facade/DriverExecutorFacade.java @@ -40,6 +40,7 @@ import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; +import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngineFactory; import java.sql.Connection; import java.sql.ResultSet; @@ -80,8 +81,8 @@ public final class DriverExecutorFacade implements AutoCloseable { JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext()); ShardingSphereMetaData metaData = connection.getContextManager().getMetaDataContexts().getMetaData(); String currentSchemaName = new DatabaseTypeRegistry(metaData.getDatabase(connection.getCurrentDatabaseName()).getProtocolType()).getDefaultSchemaName(connection.getCurrentDatabaseName()); - sqlFederationEngine = - new SQLFederationEngine(connection.getCurrentDatabaseName(), currentSchemaName, metaData, connection.getContextManager().getMetaDataContexts().getStatistics(), jdbcExecutor); + sqlFederationEngine = SQLFederationEngineFactory.getInstance().newInstance(connection.getCurrentDatabaseName(), currentSchemaName, metaData, + connection.getContextManager().getMetaDataContexts().getStatistics(), jdbcExecutor); transactionExecutor = new DriverTransactionSQLStatementExecutor(connection); RawExecutor rawExecutor = new RawExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext()); queryExecutor = new DriverExecuteQueryExecutor(connection, metaData, jdbcExecutor, rawExecutor, sqlFederationEngine); diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java index c5e4e585567..79a7eb33bc6 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java @@ -17,118 +17,30 @@ package org.apache.shardingsphere.sqlfederation.engine; -import lombok.Getter; -import org.apache.calcite.adapter.enumerable.EnumerableInterpretable; -import org.apache.calcite.adapter.enumerable.EnumerableRel; -import org.apache.calcite.adapter.java.JavaTypeFactory; -import org.apache.calcite.config.CalciteConnectionConfig; -import org.apache.calcite.config.CalciteConnectionConfigImpl; -import org.apache.calcite.jdbc.JavaTypeFactoryImpl; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.prepare.CalciteCatalogReader; -import org.apache.calcite.runtime.Bindable; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.Table; -import org.apache.calcite.sql.validate.SqlValidator; -import org.apache.calcite.sql2rel.SqlToRelConverter; -import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; -import org.apache.shardingsphere.infra.binder.context.statement.dml.SelectStatementContext; -import org.apache.shardingsphere.infra.datanode.DataNode; -import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; -import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.table.NoSuchTableException; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; -import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; -import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine; -import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; -import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData; -import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema; -import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; -import org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchemaUtils; -import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics; -import org.apache.shardingsphere.infra.rule.ShardingSphereRule; import org.apache.shardingsphere.infra.session.query.QueryContext; -import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; -import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext; -import org.apache.shardingsphere.sqlfederation.executor.enumerable.EnumerableScanExecutor; -import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationCompilerEngine; -import org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationExecutionPlan; -import org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext; -import org.apache.shardingsphere.sqlfederation.optimizer.context.planner.OptimizerMetaData; -import org.apache.shardingsphere.sqlfederation.optimizer.exception.SQLFederationSchemaNotFoundException; import org.apache.shardingsphere.sqlfederation.optimizer.exception.SQLFederationUnsupportedSQLException; -import org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.SQLFederationTable; -import org.apache.shardingsphere.sqlfederation.optimizer.planner.cache.ExecutionPlanCacheKey; -import org.apache.shardingsphere.sqlfederation.optimizer.planner.util.SQLFederationPlannerUtils; -import org.apache.shardingsphere.sqlfederation.optimizer.statement.SQLStatementCompiler; -import org.apache.shardingsphere.sqlfederation.resultset.SQLFederationResultSet; -import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule; -import org.apache.shardingsphere.sqlfederation.spi.SQLFederationDecider; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; /** * SQL federation engine. */ -@Getter -public final class SQLFederationEngine implements AutoCloseable { - - private static final int DEFAULT_METADATA_VERSION = 0; - - private static final JavaTypeFactory DEFAULT_DATA_TYPE_FACTORY = new JavaTypeFactoryImpl(); - - private final ProcessEngine processEngine = new ProcessEngine(); - - @SuppressWarnings("rawtypes") - private final Map<ShardingSphereRule, SQLFederationDecider> deciders; - - private final String currentDatabaseName; - - private final String currentSchemaName; - - private final ShardingSphereMetaData metaData; - - private final ShardingSphereStatistics statistics; - - private final JDBCExecutor jdbcExecutor; - - private final SQLFederationRule sqlFederationRule; - - private ResultSet resultSet; - - public SQLFederationEngine(final String currentDatabaseName, final String currentSchemaName, final ShardingSphereMetaData metaData, final ShardingSphereStatistics statistics, - final JDBCExecutor jdbcExecutor) { - deciders = OrderedSPILoader.getServices(SQLFederationDecider.class, metaData.getDatabase(currentDatabaseName).getRuleMetaData().getRules()); - this.currentDatabaseName = currentDatabaseName; - this.currentSchemaName = currentSchemaName; - this.metaData = metaData; - this.statistics = statistics; - this.jdbcExecutor = jdbcExecutor; - sqlFederationRule = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class); - } +public interface SQLFederationEngine extends AutoCloseable { /** - * SQL federation enabled or not. + * Judge whether SQL federation enabled or not. * - * @return enabled or not + * @return whether SQL federation enabled or not */ - public boolean enabled() { - return sqlFederationRule.getConfiguration().isSqlFederationEnabled(); - } + boolean isSqlFederationEnabled(); /** * Decide use SQL federation or not. @@ -137,48 +49,7 @@ public final class SQLFederationEngine implements AutoCloseable { * @param globalRuleMetaData global rule meta data * @return use SQL federation or not */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public boolean decide(final QueryContext queryContext, final RuleMetaData globalRuleMetaData) { - // TODO BEGIN: move this logic to SQLFederationDecider implement class when we remove sql federation type - if (isQuerySystemSchema(queryContext)) { - return true; - } - // TODO END - SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext(); - boolean sqlFederationEnabled = sqlFederationRule.getConfiguration().isSqlFederationEnabled(); - if (!sqlFederationEnabled || !(sqlStatementContext instanceof SelectStatementContext)) { - return false; - } - boolean allQueryUseSQLFederation = sqlFederationRule.getConfiguration().isAllQueryUseSQLFederation(); - if (allQueryUseSQLFederation) { - return true; - } - SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext; - Collection<String> databaseNames = selectStatementContext.getTablesContext().getDatabaseNames(); - if (databaseNames.size() > 1) { - return true; - } - ShardingSphereDatabase usedDatabase = queryContext.getUsedDatabase(); - Collection<DataNode> includedDataNodes = new HashSet<>(); - for (Entry<ShardingSphereRule, SQLFederationDecider> entry : deciders.entrySet()) { - boolean isUseSQLFederation = entry.getValue().decide(selectStatementContext, queryContext.getParameters(), globalRuleMetaData, usedDatabase, entry.getKey(), includedDataNodes); - if (isUseSQLFederation) { - return true; - } - } - return false; - } - - private boolean isQuerySystemSchema(final QueryContext queryContext) { - SQLStatementContext sqlStatementContext = queryContext.getSqlStatementContext(); - if (!(sqlStatementContext instanceof SelectStatementContext)) { - return false; - } - SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext; - ShardingSphereDatabase database = queryContext.getUsedDatabase(); - return SystemSchemaUtils.containsSystemSchema(sqlStatementContext.getDatabaseType(), selectStatementContext.getTablesContext().getSchemaNames(), database) - || SystemSchemaUtils.isOpenGaussSystemCatalogQuery(sqlStatementContext.getDatabaseType(), selectStatementContext.getSqlStatement().getProjections().getProjections()); - } + boolean decide(QueryContext queryContext, RuleMetaData globalRuleMetaData); /** * Execute query. @@ -189,104 +60,20 @@ public final class SQLFederationEngine implements AutoCloseable { * @return result set * @throws SQLFederationUnsupportedSQLException SQL federation unsupported SQL exception */ - public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, - final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext) { - try { - SelectStatementContext selectStatementContext = (SelectStatementContext) federationContext.getQueryContext().getSqlStatementContext(); - String databaseName = selectStatementContext.getTablesContext().getDatabaseNames().stream().findFirst().orElse(currentDatabaseName); - String schemaName = selectStatementContext.getTablesContext().getSchemaName().orElse(currentSchemaName); - OptimizerMetaData optimizerMetaData = sqlFederationRule.getOptimizerContext().getMetaData(databaseName); - CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDialectProps()); - CalciteCatalogReader catalogReader = SQLFederationPlannerUtils.createCatalogReader(schemaName, optimizerMetaData.getSchema(schemaName), DEFAULT_DATA_TYPE_FACTORY, connectionConfig, - selectStatementContext.getDatabaseType()); - SqlValidator validator = SQLFederationPlannerUtils.createSqlValidator(catalogReader, DEFAULT_DATA_TYPE_FACTORY, - sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDatabaseType(), connectionConfig); - SqlToRelConverter converter = SQLFederationPlannerUtils.createSqlToRelConverter(catalogReader, validator, SQLFederationPlannerUtils.createRelOptCluster(DEFAULT_DATA_TYPE_FACTORY), - sqlFederationRule.getOptimizerContext().getSqlParserRule(), sqlFederationRule.getOptimizerContext().getParserContext(databaseName).getDatabaseType(), true); - Schema sqlFederationSchema = catalogReader.getRootSchema().plus().getSubSchema(schemaName); - ShardingSpherePreconditions.checkNotNull(sqlFederationSchema, () -> new SQLFederationSchemaNotFoundException(federationContext.getQueryContext().getSql())); - SQLFederationExecutionPlan executionPlan = compileQuery(prepareEngine, callback, federationContext, databaseName, schemaName, sqlFederationSchema, converter); - resultSet = executePlan(federationContext, executionPlan, validator, converter, sqlFederationSchema); - return resultSet; - // CHECKSTYLE:OFF - } catch (final Exception ex) { - // CHECKSTYLE:ON - throw new SQLFederationUnsupportedSQLException(federationContext.getQueryContext().getSql(), ex); - } - } - - private SQLFederationExecutionPlan compileQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback, - final SQLFederationContext federationContext, final String databaseName, final String schemaName, final Schema sqlFederationSchema, - final SqlToRelConverter converter) { - SQLStatementContext sqlStatementContext = federationContext.getQueryContext().getSqlStatementContext(); - ShardingSpherePreconditions.checkState(sqlStatementContext instanceof SelectStatementContext, () -> new IllegalArgumentException("SQL statement context must be select statement context.")); - registerTableScanExecutor(sqlFederationSchema, prepareEngine, callback, federationContext, sqlFederationRule.getOptimizerContext(), databaseName, schemaName); - SQLStatementCompiler sqlStatementCompiler = new SQLStatementCompiler(converter); - SQLFederationCompilerEngine compilerEngine = new SQLFederationCompilerEngine(databaseName, schemaName, sqlFederationRule.getConfiguration().getExecutionPlanCache()); - // TODO open useCache flag when ShardingSphereTable contains version - return compilerEngine.compile(buildCacheKey(federationContext, (SelectStatementContext) sqlStatementContext, sqlStatementCompiler, databaseName, schemaName), false); - } - - @SuppressWarnings("unchecked") - private ResultSet executePlan(final SQLFederationContext federationContext, final SQLFederationExecutionPlan executionPlan, final SqlValidator validator, final SqlToRelConverter converter, - final Schema sqlFederationSchema) { - try { - Bindable<Object> executablePlan = EnumerableInterpretable.toBindable(Collections.emptyMap(), null, (EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY); - Map<String, Object> params = createParameters(federationContext.getQueryContext().getParameters()); - Enumerator<Object> enumerator = executablePlan.bind(new SQLFederationBindContext(validator, converter, params)).enumerator(); - return new SQLFederationResultSet(enumerator, sqlFederationSchema, (SelectStatementContext) federationContext.getQueryContext().getSqlStatementContext(), - executionPlan.getResultColumnType()); - } finally { - processEngine.completeSQLExecution(federationContext.getProcessId()); - } - } + ResultSet executeQuery(DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, + JDBCExecutorCallback<? extends ExecuteResult> callback, SQLFederationContext federationContext); - private ExecutionPlanCacheKey buildCacheKey(final SQLFederationContext federationContext, final SelectStatementContext selectStatementContext, - final SQLStatementCompiler sqlStatementCompiler, final String databaseName, final String schemaName) { - ShardingSphereSchema schema = federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName); - ExecutionPlanCacheKey result = - new ExecutionPlanCacheKey(federationContext.getQueryContext().getSql(), selectStatementContext.getSqlStatement(), selectStatementContext.getDatabaseType().getType(), - sqlStatementCompiler); - for (String each : selectStatementContext.getTablesContext().getTableNames()) { - ShardingSphereTable table = schema.getTable(each); - ShardingSpherePreconditions.checkNotNull(table, () -> new NoSuchTableException(each)); - // TODO replace DEFAULT_METADATA_VERSION with actual version in ShardingSphereTable - result.getTableMetaDataVersions().put(table.getName(), DEFAULT_METADATA_VERSION); - } - return result; - } - - private void registerTableScanExecutor(final Schema sqlFederationSchema, final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, - final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext, - final OptimizerContext optimizerContext, final String databaseName, final String schemaName) { - if (null == sqlFederationSchema) { - return; - } - SQLFederationExecutorContext executorContext = new SQLFederationExecutorContext(databaseName, schemaName, metaData.getProps()); - EnumerableScanExecutor scanExecutor = - new EnumerableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext, executorContext, federationContext, metaData.getGlobalRuleMetaData(), statistics); - // TODO register only the required tables - for (ShardingSphereTable each : metaData.getDatabase(databaseName).getSchema(schemaName).getAllTables()) { - Table table = sqlFederationSchema.getTable(each.getName()); - if (table instanceof SQLFederationTable) { - ((SQLFederationTable) table).setScanExecutor(scanExecutor); - } - } - } - - private Map<String, Object> createParameters(final List<Object> params) { - Map<String, Object> result = new HashMap<>(params.size(), 1F); - int index = 0; - for (Object each : params) { - result.put("?" + index++, each); - } - return result; - } + /** + * Get result set. + * + * @return result set + */ + ResultSet getResultSet(); - @Override - public void close() throws SQLException { - if (null != resultSet) { - resultSet.close(); - } - } + /** + * Close. + * + * @throws SQLException SQL exception + */ + void close() throws SQLException; } diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngineFactory.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngineFactory.java new file mode 100644 index 00000000000..b2876a8f3bd --- /dev/null +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngineFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.sqlfederation.engine; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; +import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics; +import org.apache.shardingsphere.sqlfederation.engine.impl.StandardSQLFederationEngine; + +/** + * SQL federation engine factory. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class SQLFederationEngineFactory { + + private static final SQLFederationEngineFactory INSTANCE = new SQLFederationEngineFactory(); + + /** + * Get backend handler factory instance. + * + * @return backend handler factory + */ + public static SQLFederationEngineFactory getInstance() { + return INSTANCE; + } + + /** + * Create new instance of {@link SQLFederationEngine}. + * + * @param currentDatabaseName current database name + * @param currentSchemaName current schema name + * @param metaData shardingSphere meta data + * @param statistics shardingSphere statistics + * @param jdbcExecutor JDBC executor + * @return created instance + */ + public SQLFederationEngine newInstance(final String currentDatabaseName, final String currentSchemaName, final ShardingSphereMetaData metaData, final ShardingSphereStatistics statistics, + final JDBCExecutor jdbcExecutor) { + return new StandardSQLFederationEngine(currentDatabaseName, currentSchemaName, metaData, statistics, jdbcExecutor); + } +} diff --git a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/impl/StandardSQLFederationEngine.java similarity index 94% copy from kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java copy to kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/impl/StandardSQLFederationEngine.java index c5e4e585567..82c46e248e5 100644 --- a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java +++ b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/impl/StandardSQLFederationEngine.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.sqlfederation.engine; +package org.apache.shardingsphere.sqlfederation.engine.impl; import lombok.Getter; import org.apache.calcite.adapter.enumerable.EnumerableInterpretable; @@ -52,6 +52,7 @@ import org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatist import org.apache.shardingsphere.infra.rule.ShardingSphereRule; import org.apache.shardingsphere.infra.session.query.QueryContext; import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader; +import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext; @@ -82,10 +83,10 @@ import java.util.Map; import java.util.Map.Entry; /** - * SQL federation engine. + * Standard SQL federation engine. */ @Getter -public final class SQLFederationEngine implements AutoCloseable { +public final class StandardSQLFederationEngine implements SQLFederationEngine { private static final int DEFAULT_METADATA_VERSION = 0; @@ -110,8 +111,8 @@ public final class SQLFederationEngine implements AutoCloseable { private ResultSet resultSet; - public SQLFederationEngine(final String currentDatabaseName, final String currentSchemaName, final ShardingSphereMetaData metaData, final ShardingSphereStatistics statistics, - final JDBCExecutor jdbcExecutor) { + public StandardSQLFederationEngine(final String currentDatabaseName, final String currentSchemaName, final ShardingSphereMetaData metaData, final ShardingSphereStatistics statistics, + final JDBCExecutor jdbcExecutor) { deciders = OrderedSPILoader.getServices(SQLFederationDecider.class, metaData.getDatabase(currentDatabaseName).getRuleMetaData().getRules()); this.currentDatabaseName = currentDatabaseName; this.currentSchemaName = currentSchemaName; @@ -121,23 +122,13 @@ public final class SQLFederationEngine implements AutoCloseable { sqlFederationRule = metaData.getGlobalRuleMetaData().getSingleRule(SQLFederationRule.class); } - /** - * SQL federation enabled or not. - * - * @return enabled or not - */ - public boolean enabled() { + @Override + public boolean isSqlFederationEnabled() { return sqlFederationRule.getConfiguration().isSqlFederationEnabled(); } - /** - * Decide use SQL federation or not. - * - * @param queryContext query context - * @param globalRuleMetaData global rule meta data - * @return use SQL federation or not - */ @SuppressWarnings({"unchecked", "rawtypes"}) + @Override public boolean decide(final QueryContext queryContext, final RuleMetaData globalRuleMetaData) { // TODO BEGIN: move this logic to SQLFederationDecider implement class when we remove sql federation type if (isQuerySystemSchema(queryContext)) { @@ -180,15 +171,7 @@ public final class SQLFederationEngine implements AutoCloseable { || SystemSchemaUtils.isOpenGaussSystemCatalogQuery(sqlStatementContext.getDatabaseType(), selectStatementContext.getSqlStatement().getProjections().getProjections()); } - /** - * Execute query. - * - * @param prepareEngine prepare engine - * @param callback callback - * @param federationContext federation context - * @return result set - * @throws SQLFederationUnsupportedSQLException SQL federation unsupported SQL exception - */ + @Override public ResultSet executeQuery(final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine, final JDBCExecutorCallback<? extends ExecuteResult> callback, final SQLFederationContext federationContext) { try { @@ -285,7 +268,7 @@ public final class SQLFederationEngine implements AutoCloseable { @Override public void close() throws SQLException { - if (null != resultSet) { + if (null != resultSet && !resultSet.isClosed()) { resultSet.close(); } } diff --git a/kernel/sql-federation/core/src/test/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngineTest.java b/kernel/sql-federation/core/src/test/java/org/apache/shardingsphere/sqlfederation/engine/StandardSQLFederationEngineTest.java similarity index 90% rename from kernel/sql-federation/core/src/test/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngineTest.java rename to kernel/sql-federation/core/src/test/java/org/apache/shardingsphere/sqlfederation/engine/StandardSQLFederationEngineTest.java index f954bc2a7e8..d7f22ae0668 100644 --- a/kernel/sql-federation/core/src/test/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngineTest.java +++ b/kernel/sql-federation/core/src/test/java/org/apache/shardingsphere/sqlfederation/engine/StandardSQLFederationEngineTest.java @@ -32,6 +32,7 @@ import org.apache.shardingsphere.sql.parser.api.CacheOption; import org.apache.shardingsphere.sqlfederation.config.SQLFederationRuleConfiguration; import org.apache.shardingsphere.sqlfederation.engine.fixture.rule.SQLFederationDeciderRuleMatchFixture; import org.apache.shardingsphere.sqlfederation.engine.fixture.rule.SQLFederationDeciderRuleNotMatchFixture; +import org.apache.shardingsphere.sqlfederation.engine.impl.StandardSQLFederationEngine; import org.apache.shardingsphere.sqlfederation.rule.SQLFederationRule; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -51,7 +52,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -class SQLFederationEngineTest { +class StandardSQLFederationEngineTest { private final DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "MySQL"); @@ -70,15 +71,15 @@ class SQLFederationEngineTest { when(queryContext.getSqlStatementContext()).thenReturn(sqlStatementContext); ShardingSphereDatabase database = new ShardingSphereDatabase("foo_db", databaseType, mock(ResourceMetaData.class, RETURNS_DEEP_STUBS), mock(RuleMetaData.class), Collections.emptyList()); when(queryContext.getUsedDatabase()).thenReturn(database); - SQLFederationEngine engine = createSQLFederationEngine(globalRules, Collections.emptyList()); + StandardSQLFederationEngine engine = createSQLFederationEngine(globalRules, Collections.emptyList()); assertTrue(engine.decide(queryContext, mock(RuleMetaData.class))); engine.close(); } - private SQLFederationEngine createSQLFederationEngine(final Collection<ShardingSphereRule> globalRules, final Collection<ShardingSphereRule> databaseRules) { + private StandardSQLFederationEngine createSQLFederationEngine(final Collection<ShardingSphereRule> globalRules, final Collection<ShardingSphereRule> databaseRules) { when(metaData.getDatabase("foo_db").getRuleMetaData().getRules()).thenReturn(databaseRules); when(metaData.getGlobalRuleMetaData()).thenReturn(new RuleMetaData(globalRules)); - return new SQLFederationEngine("foo_db", "foo_db", metaData, mock(ShardingSphereStatistics.class), mock(JDBCExecutor.class)); + return new StandardSQLFederationEngine("foo_db", "foo_db", metaData, mock(ShardingSphereStatistics.class), mock(JDBCExecutor.class)); } @Test @@ -86,7 +87,7 @@ class SQLFederationEngineTest { Collection<ShardingSphereRule> globalRules = Collections .singletonList(new SQLFederationRule(new SQLFederationRuleConfiguration(false, false, mock(CacheOption.class)), Collections.emptyList())); - SQLFederationEngine engine = createSQLFederationEngine(globalRules, Collections.emptyList()); + StandardSQLFederationEngine engine = createSQLFederationEngine(globalRules, Collections.emptyList()); RuleMetaData globalRuleMetaData = new RuleMetaData(globalRules); assertFalse(engine.decide(mock(QueryContext.class), globalRuleMetaData)); engine.close(); @@ -104,7 +105,7 @@ class SQLFederationEngineTest { QueryContext queryContext = mock(QueryContext.class); when(queryContext.getSqlStatementContext()).thenReturn(selectStatementContext); when(queryContext.getUsedDatabase()).thenReturn(database); - SQLFederationEngine engine = createSQLFederationEngine(globalRules, Collections.emptyList()); + StandardSQLFederationEngine engine = createSQLFederationEngine(globalRules, Collections.emptyList()); RuleMetaData globalRuleMetaData = new RuleMetaData(globalRules); assertTrue(engine.decide(queryContext, globalRuleMetaData)); engine.close(); @@ -114,7 +115,7 @@ class SQLFederationEngineTest { void assertDecideWhenExecuteNotSelectStatement() throws SQLException { Collection<ShardingSphereRule> globalRules = Collections.singletonList(new SQLFederationRule(new SQLFederationRuleConfiguration(true, false, mock(CacheOption.class)), Collections.emptyList())); - SQLFederationEngine engine = createSQLFederationEngine(globalRules, Collections.emptyList()); + StandardSQLFederationEngine engine = createSQLFederationEngine(globalRules, Collections.emptyList()); RuleMetaData globalRuleMetaData = new RuleMetaData(globalRules); assertFalse(engine.decide(mock(QueryContext.class), globalRuleMetaData)); engine.close(); @@ -133,7 +134,7 @@ class SQLFederationEngineTest { QueryContext queryContext = mock(QueryContext.class); when(queryContext.getSqlStatementContext()).thenReturn(selectStatementContext); when(queryContext.getUsedDatabase()).thenReturn(database); - SQLFederationEngine engine = createSQLFederationEngine(globalRules, databaseRules); + StandardSQLFederationEngine engine = createSQLFederationEngine(globalRules, databaseRules); RuleMetaData globalRuleMetaData = new RuleMetaData(globalRules); assertTrue(engine.decide(queryContext, globalRuleMetaData)); engine.close(); @@ -152,7 +153,7 @@ class SQLFederationEngineTest { QueryContext queryContext = mock(QueryContext.class); when(queryContext.getSqlStatementContext()).thenReturn(selectStatementContext); when(queryContext.getUsedDatabase()).thenReturn(database); - SQLFederationEngine engine = createSQLFederationEngine(globalRules, databaseRules); + StandardSQLFederationEngine engine = createSQLFederationEngine(globalRules, databaseRules); assertFalse(engine.decide(queryContext, new RuleMetaData(globalRules))); engine.close(); } @@ -172,7 +173,7 @@ class SQLFederationEngineTest { when(queryContext.getSqlStatementContext()).thenReturn(selectStatementContext); when(queryContext.getParameters()).thenReturn(Collections.emptyList()); when(queryContext.getUsedDatabase()).thenReturn(database); - SQLFederationEngine engine = createSQLFederationEngine(globalRules, databaseRules); + StandardSQLFederationEngine engine = createSQLFederationEngine(globalRules, databaseRules); assertTrue(engine.decide(queryContext, new RuleMetaData(globalRules))); engine.close(); } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java index 332c2841186..09b2cfdf9ec 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutor.java @@ -68,6 +68,7 @@ import org.apache.shardingsphere.sql.parser.statement.opengauss.OpenGaussStateme import org.apache.shardingsphere.sql.parser.statement.opengauss.ddl.OpenGaussCursorStatement; import org.apache.shardingsphere.sql.parser.statement.postgresql.PostgreSQLStatement; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; +import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngineFactory; import org.apache.shardingsphere.transaction.api.TransactionType; import org.apache.shardingsphere.transaction.spi.TransactionHook; @@ -113,7 +114,8 @@ public final class ProxySQLExecutor { ? databaseConnectionManager.getConnectionSession().getUsedDatabaseName() : databaseConnectionManager.getConnectionSession().getCurrentDatabaseName(); String currentSchemaName = getSchemaName(queryContext.getSqlStatementContext(), metaDataContexts.getMetaData().getDatabase(currentDatabaseName)); - sqlFederationEngine = new SQLFederationEngine(currentDatabaseName, currentSchemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor); + sqlFederationEngine = + SQLFederationEngineFactory.getInstance().newInstance(currentDatabaseName, currentSchemaName, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor); } private String getSchemaName(final SQLStatementContext sqlStatementContext, final ShardingSphereDatabase database) { diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java index 1129f65ea94..a7a1978c50b 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java @@ -187,7 +187,7 @@ public final class StandardDatabaseConnector implements DatabaseConnector { if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext, contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData())) { return processExecuteFederation(doExecuteFederation()); } - if (proxySQLExecutor.getSqlFederationEngine().enabled() && federationMetaDataRefreshEngine.isNeedRefresh(queryContext.getSqlStatementContext())) { + if (proxySQLExecutor.getSqlFederationEngine().isSqlFederationEnabled() && federationMetaDataRefreshEngine.isNeedRefresh(queryContext.getSqlStatementContext())) { federationMetaDataRefreshEngine.refresh(queryContext.getSqlStatementContext()); return new UpdateResponseHeader(queryContext.getSqlStatementContext().getSqlStatement()); } diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java index ae9a7909c67..c6763e14dec 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java @@ -58,6 +58,7 @@ import org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnection import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext; import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; +import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngineFactory; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import java.sql.Connection; @@ -114,7 +115,8 @@ public final class PreviewExecutor implements DistSQLQueryExecutor<PreviewStatem private Collection<ExecutionUnit> getExecutionUnits(final ContextManager contextManager, final String schemaName, final ShardingSphereMetaData metaData, final QueryContext queryContext) { JDBCExecutor jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), connectionContext.getQueryContext().getConnectionContext()); - SQLFederationEngine federationEngine = new SQLFederationEngine(database.getName(), schemaName, metaData, contextManager.getMetaDataContexts().getStatistics(), jdbcExecutor); + SQLFederationEngine federationEngine = + SQLFederationEngineFactory.getInstance().newInstance(database.getName(), schemaName, metaData, contextManager.getMetaDataContexts().getStatistics(), jdbcExecutor); if (federationEngine.decide(queryContext, metaData.getGlobalRuleMetaData())) { return getFederationExecutionUnits(queryContext, metaData, federationEngine); } diff --git a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java index abc39abd660..f399f291863 100644 --- a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java +++ b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java @@ -48,6 +48,7 @@ import org.apache.shardingsphere.proxy.backend.session.ConnectionSession; import org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult; import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement; import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine; +import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngineFactory; import org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext; import java.sql.Connection; @@ -84,7 +85,9 @@ public final class OpenGaussSystemCatalogAdminQueryExecutor implements DatabaseA public void execute(final ConnectionSession connectionSession) throws SQLException { MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts(); JDBCExecutor jdbcExecutor = new JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), connectionSession.getConnectionContext()); - try (SQLFederationEngine sqlFederationEngine = new SQLFederationEngine(databaseName, PG_CATALOG, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor)) { + try ( + SQLFederationEngine sqlFederationEngine = + SQLFederationEngineFactory.getInstance().newInstance(databaseName, PG_CATALOG, metaDataContexts.getMetaData(), metaDataContexts.getStatistics(), jdbcExecutor)) { DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(metaDataContexts, connectionSession); SQLFederationContext context = new SQLFederationContext(false, new QueryContext(sqlStatementContext, sql, parameters, SQLHintUtils.extractHint(sql), connectionSession.getConnectionContext(), metaDataContexts.getMetaData()),