This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a696de87196 Add FederationResultSetMetaData to implement
AdvancedFederationExecutor logic (#19712)
a696de87196 is described below
commit a696de87196885f532b07c9a5b5c2b2bc95c8e06
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Sat Jul 30 17:25:29 2022 +0800
Add FederationResultSetMetaData to implement AdvancedFederationExecutor
logic (#19712)
---
.../advanced/AdvancedFederationExecutor.java | 29 ++--
.../advanced/resultset/FederationResultSet.java | 13 +-
.../resultset/FederationResultSetMetaData.java | 176 +++++++++++++++++++++
.../executor/original/schema/FilterableSchema.java | 1 +
4 files changed, 206 insertions(+), 13 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
index efa13524d4b..6782cd3f8aa 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/AdvancedFederationExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.federation.executor.advanced;
+import com.google.common.base.Preconditions;
import org.apache.calcite.adapter.enumerable.EnumerableInterpretable;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.config.CalciteConnectionConfig;
@@ -30,6 +31,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.runtime.Bindable;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.eventbus.EventBusContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
@@ -48,6 +51,7 @@ import
org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerCon
import
org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory;
import
org.apache.shardingsphere.infra.federation.optimizer.metadata.FederationSchemaMetaData;
import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.Connection;
@@ -91,22 +95,29 @@ public final class AdvancedFederationExecutor implements
FederationExecutor {
@Override
public ResultSet executeQuery(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final JDBCExecutorCallback<? extends
ExecuteResult> callback, final FederationContext federationContext) throws
SQLException {
- SQLStatement sqlStatement =
federationContext.getLogicSQL().getSqlStatementContext().getSqlStatement();
- Enumerator<Object[]> enumerator = execute(sqlStatement,
federationContext, prepareEngine, callback).enumerator();
- resultSet = new FederationResultSet(enumerator,
federationContext.getLogicSQL().getSqlStatementContext());
+ SQLStatementContext<?> sqlStatementContext =
federationContext.getLogicSQL().getSqlStatementContext();
+ Preconditions.checkArgument(sqlStatementContext instanceof
SelectStatementContext, "SQL statement context must be select statement
context.");
+ ShardingSphereSchema schema =
federationContext.getDatabases().get(databaseName.toLowerCase()).getSchema(schemaName);
+ FilterableSchema filterableSchema =
createFilterableSchema(prepareEngine, schema, callback, federationContext);
+ Enumerator<Object[]> enumerator =
execute(sqlStatementContext.getSqlStatement(), filterableSchema).enumerator();
+ resultSet = new FederationResultSet(enumerator, schema,
filterableSchema, sqlStatementContext);
return resultSet;
}
- @SuppressWarnings("unchecked")
- private Enumerable<Object[]> execute(final SQLStatement sqlStatement,
final FederationContext federationContext, final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<? extends
ExecuteResult> callback) {
+ private FilterableSchema createFilterableSchema(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
final ShardingSphereSchema schema,
+ final
JDBCExecutorCallback<? extends ExecuteResult> callback, final FederationContext
federationContext) {
FilterableTableScanExecutorContext executorContext = new
FilterableTableScanExecutorContext(databaseName, schemaName, props,
federationContext);
FilterableTableScanExecutor executor = new
FilterableTableScanExecutor(prepareEngine, jdbcExecutor, callback,
optimizerContext, globalRuleMetaData, executorContext, eventBusContext);
+ FederationSchemaMetaData schemaMetaData = new
FederationSchemaMetaData(schemaName, schema.getTables());
+ return new FilterableSchema(schemaMetaData, executor);
+ }
+
+ @SuppressWarnings("unchecked")
+ private Enumerable<Object[]> execute(final SQLStatement sqlStatement,
final FilterableSchema filterableSchema) {
+ // TODO remove OptimizerPlannerContextFactory call and use setup
executor to handle this logic
CalciteConnectionConfig connectionConfig = new
CalciteConnectionConfigImpl(OptimizerPlannerContextFactory.createConnectionProperties());
RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl();
- FederationSchemaMetaData schemaMetaData = new
FederationSchemaMetaData(schemaName,
federationContext.getDatabases().get(databaseName).getSchema(schemaName).getTables());
- // TODO remove OptimizerPlannerContextFactory call and use setup
executor to handle this logic
- CalciteCatalogReader catalogReader =
OptimizerPlannerContextFactory.createCatalogReader(schemaName, new
FilterableSchema(schemaMetaData, executor), relDataTypeFactory,
connectionConfig);
+ CalciteCatalogReader catalogReader =
OptimizerPlannerContextFactory.createCatalogReader(schemaName,
filterableSchema, relDataTypeFactory, connectionConfig);
SqlValidator validator =
OptimizerPlannerContextFactory.createValidator(catalogReader,
relDataTypeFactory, connectionConfig);
SqlToRelConverter converter =
OptimizerPlannerContextFactory.createConverter(catalogReader, validator,
relDataTypeFactory);
RelNode bestPlan = new ShardingSphereOptimizer(optimizerContext,
converter).optimize(databaseName, schemaName, sqlStatement);
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
index c1fba3fd068..9cfed743252 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSet.java
@@ -17,11 +17,14 @@
package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.linq4j.Enumerator;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
import
org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtil;
+import
org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import java.io.InputStream;
import java.io.Reader;
@@ -61,15 +64,18 @@ public final class FederationResultSet extends
AbstractUnsupportedOperationResul
private final Map<String, Integer> columnLabelAndIndexMap;
+ private final FederationResultSetMetaData resultSetMetaData;
+
private Object[] currentRows;
private boolean wasNull;
private boolean closed;
- public FederationResultSet(final Enumerator<Object[]> enumerator, final
SQLStatementContext<?> sqlStatementContext) {
+ public FederationResultSet(final Enumerator<Object[]> enumerator, final
ShardingSphereSchema schema, final FilterableSchema filterableSchema, final
SQLStatementContext<?> sqlStatementContext) {
this.enumerator = enumerator;
columnLabelAndIndexMap =
createColumnLabelAndIndexMap(sqlStatementContext);
+ resultSetMetaData = new FederationResultSetMetaData(schema,
filterableSchema, new JavaTypeFactoryImpl(), (SelectStatementContext)
sqlStatementContext);
}
private Map<String, Integer> createColumnLabelAndIndexMap(final
SQLStatementContext<?> sqlStatementContext) {
@@ -312,8 +318,7 @@ public final class FederationResultSet extends
AbstractUnsupportedOperationResul
@Override
public ResultSetMetaData getMetaData() throws SQLException {
- // TODO implement getMetaData for federation resultset
- return null;
+ return resultSetMetaData;
}
@Override
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
new file mode 100644
index 00000000000..fbd2f44880d
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/resultset/FederationResultSetMetaData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.federation.executor.advanced.resultset;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.Table;
+import
org.apache.shardingsphere.infra.binder.segment.select.projection.Projection;
+import
org.apache.shardingsphere.infra.binder.segment.select.projection.impl.ColumnProjection;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.database.DefaultDatabase;
+import
org.apache.shardingsphere.infra.federation.executor.original.schema.FilterableSchema;
+import
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Federation result set meta data.
+ */
+@RequiredArgsConstructor
+public final class FederationResultSetMetaData extends WrapperAdapter
implements ResultSetMetaData {
+
+ private final ShardingSphereSchema schema;
+
+ private final FilterableSchema filterableSchema;
+
+ private final RelDataTypeFactory relDataTypeFactory;
+
+ private final SelectStatementContext selectStatementContext;
+
+ @Override
+ public int getColumnCount() throws SQLException {
+ return
selectStatementContext.getProjectionsContext().getExpandProjections().size();
+ }
+
+ @Override
+ public boolean isAutoIncrement(final int column) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean isCaseSensitive(final int column) {
+ return true;
+ }
+
+ @Override
+ public boolean isSearchable(final int column) {
+ return false;
+ }
+
+ @Override
+ public boolean isCurrency(final int column) {
+ return false;
+ }
+
+ @Override
+ public int isNullable(final int column) {
+ Optional<Table> table = findTableName(column).flatMap(optional ->
Optional.ofNullable(filterableSchema.getTable(optional)));
+ return !table.isPresent() ||
table.get().getRowType(relDataTypeFactory).isNullable() ?
ResultSetMetaData.columnNullable : ResultSetMetaData.columnNoNulls;
+ }
+
+ @Override
+ public boolean isSigned(final int column) throws SQLException {
+ return true;
+ }
+
+ @Override
+ public int getColumnDisplaySize(final int column) {
+ return findTableName(column).flatMap(optional ->
Optional.ofNullable(filterableSchema.getTable(optional))).map(optional ->
optional.getRowType(relDataTypeFactory).getPrecision()).orElse(0);
+ }
+
+ @Override
+ public String getColumnLabel(final int column) throws SQLException {
+ Projection projection =
selectStatementContext.getProjectionsContext().getExpandProjections().get(column
- 1);
+ if (projection instanceof ColumnProjection) {
+ return ((ColumnProjection) projection).getName();
+ }
+ return projection.getColumnLabel();
+ }
+
+ @Override
+ public String getColumnName(final int column) throws SQLException {
+ Projection projection =
selectStatementContext.getProjectionsContext().getExpandProjections().get(column
- 1);
+ if (projection instanceof ColumnProjection) {
+ return ((ColumnProjection) projection).getName();
+ }
+ return projection.getColumnLabel();
+ }
+
+ @Override
+ public String getSchemaName(final int column) {
+ return DefaultDatabase.LOGIC_NAME;
+ }
+
+ @Override
+ public int getPrecision(final int column) {
+ Optional<Table> table = findTableName(column).flatMap(optional ->
Optional.ofNullable(filterableSchema.getTable(optional)));
+ return !table.isPresent() || RelDataType.PRECISION_NOT_SPECIFIED ==
table.get().getRowType(relDataTypeFactory).getPrecision() ? 0 :
table.get().getRowType(relDataTypeFactory).getPrecision();
+ }
+
+ @Override
+ public int getScale(final int column) {
+ Optional<Table> table = findTableName(column).flatMap(optional ->
Optional.ofNullable(filterableSchema.getTable(optional)));
+ return !table.isPresent() || RelDataType.SCALE_NOT_SPECIFIED ==
table.get().getRowType(relDataTypeFactory).getScale() ? 0 :
table.get().getRowType(relDataTypeFactory).getScale();
+ }
+
+ @Override
+ public String getTableName(final int column) throws SQLException {
+ return findTableName(column).orElse("");
+ }
+
+ @Override
+ public String getCatalogName(final int column) {
+ return DefaultDatabase.LOGIC_NAME;
+ }
+
+ @Override
+ public int getColumnType(final int column) throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public String getColumnTypeName(final int column) throws SQLException {
+ return "";
+ }
+
+ @Override
+ public boolean isReadOnly(final int column) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean isWritable(final int column) {
+ return false;
+ }
+
+ @Override
+ public boolean isDefinitelyWritable(final int column) {
+ return false;
+ }
+
+ @Override
+ public String getColumnClassName(final int column) {
+ return "";
+ }
+
+ private Optional<String> findTableName(final int column) {
+ Projection projection =
selectStatementContext.getProjectionsContext().getExpandProjections().get(column
- 1);
+ if (projection instanceof ColumnProjection) {
+ Map<String, String> tableNamesByColumnProjection =
+
selectStatementContext.getTablesContext().findTableNamesByColumnProjection(Collections.singletonList((ColumnProjection)
projection), schema);
+ return
Optional.of(tableNamesByColumnProjection.get(projection.getExpression()));
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
index 318823b777b..ca851972870 100644
---
a/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
+++
b/shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/original/schema/FilterableSchema.java
@@ -47,6 +47,7 @@ public final class FilterableSchema extends AbstractSchema {
private Map<String, Table> createTableMap(final FederationSchemaMetaData
schemaMetaData, final FilterableTableScanExecutor executor) {
Map<String, Table> result = new
LinkedHashMap<>(schemaMetaData.getTables().size(), 1);
for (FederationTableMetaData each :
schemaMetaData.getTables().values()) {
+ // TODO implement table statistic logic after using custom
operators
result.put(each.getName(), new FilterableTable(each, executor, new
FederationTableStatistic()));
}
return result;