strongduanmu commented on code in PR #20186: URL: https://github.com/apache/shardingsphere/pull/20186#discussion_r945478291
########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/metadata/translatable/TranslatableFilterRule.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Planner rule for pushing filters into table scan. + */ +public class TranslatableFilterRule extends RelOptRule { + + public static final TranslatableFilterRule INSTANCE = + new TranslatableFilterRule(RelFactories.LOGICAL_BUILDER); + + /** + * Creates a TranslatableFilterRule. + * + * @param relBuilderFactory Builder for relational expressions + */ + public TranslatableFilterRule(final RelBuilderFactory relBuilderFactory) { + super( + operand(LogicalFilter.class, + operand(TranslatableTableScan.class, none())), + relBuilderFactory, + "TranslatableFilterRule"); + } + + @Override + public void onMatch(final RelOptRuleCall call) { + final LogicalFilter filter = call.rel(0); Review Comment: Please remove final modifier. ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/metadata/translatable/TranslatableProjectFilterRule.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Planner rule for pushing projections and filters into table scan. + */ +public class TranslatableProjectFilterRule extends RelOptRule { + + public static final TranslatableProjectFilterRule INSTANCE = + new TranslatableProjectFilterRule(RelFactories.LOGICAL_BUILDER); + + /** + * Creates a TranslatableProjectFilterRule. + * + * @param relBuilderFactory Builder for relational expressions + */ + public TranslatableProjectFilterRule(final RelBuilderFactory relBuilderFactory) { + super( + operand(LogicalProject.class, + operand(LogicalFilter.class, + operand(TranslatableTableScan.class, none()))), + relBuilderFactory, + "TranslatableProjectFilterRule"); + } + + @Override + public void onMatch(final RelOptRuleCall call) { + final LogicalProject project = call.rel(0); + final LogicalFilter filter = call.rel(1); + final TranslatableTableScan scan = call.rel(2); + RexNode filterNode = filter.getCondition(); + List filters = new ArrayList(); + filters.add(filterNode); + int[] fields = getProjectFields(project.getProjects()); + if (fields == null) { + return; + } + call.transformTo( + new TranslatableTableScan( + scan.getCluster(), + scan.getTable(), + scan.getTranslatableTable(), + filters, + fields)); + } + + private int[] getProjectFields(final List<RexNode> exps) { + final int[] fields = new int[exps.size()]; + for (int i = 0; i < exps.size(); i++) { + final RexNode exp = exps.get(i); + if (exp instanceof RexInputRef) { + fields[i] = ((RexInputRef) exp).getIndex(); + } else { + return null; + } + } + return fields; Review Comment: Please rename fields to result. ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/metadata/translatable/TranslatableProjectRule.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.util.List; + +/** + * Planner rule for pushing projections into table scan. + */ +public class TranslatableProjectRule extends RelOptRule { + + public static final TranslatableProjectRule INSTANCE = new TranslatableProjectRule(RelFactories.LOGICAL_BUILDER); + + /** + * Creates a TranslatableProjectRule. + * + * @param relBuilderFactory Builder for relational expressions + */ + public TranslatableProjectRule(final RelBuilderFactory relBuilderFactory) { + super(operand(LogicalProject.class, operand(TranslatableTableScan.class, none())), + relBuilderFactory, "TranslatableProjectRule"); + } + + @Override + public void onMatch(final RelOptRuleCall call) { + final LogicalProject project = call.rel(0); Review Comment: Please remove final. ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutorFactory.java: ########## @@ -34,20 +34,20 @@ public final class FederationExecutorFactory { /** * Create new instance of federation executor factory. - * - * @param databaseName database name - * @param schemaName schema name + * + * @param databaseName database name + * @param schemaName schema name * @param globalRuleMetaData global rule meta data - * @param optimizerContext filterable optimizer context - * @param props configuration properties - * @param jdbcExecutor jdbc executor - * @param eventBusContext event bus context + * @param optimizerContext filterable optimizer context + * @param props configuration properties + * @param jdbcExecutor jdbc executor + * @param eventBusContext event bus context * @return created instance */ public static FederationExecutor newInstance(final String databaseName, final String schemaName, final OptimizerContext optimizerContext, final ShardingSphereRuleMetaData globalRuleMetaData, final ConfigurationProperties props, final JDBCExecutor jdbcExecutor, final EventBusContext eventBusContext) { - // TODO Consider about AdvancedFederationExecutor + // TODO Consider about AdvancedFederationExecutor and TranslatableFederationExecutor Review Comment: Can we merge TranslatableFederationExecutor into AdvancedFederationExecutor? ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/metadata/translatable/TranslatableProjectRule.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.util.List; + +/** + * Planner rule for pushing projections into table scan. + */ +public class TranslatableProjectRule extends RelOptRule { + + public static final TranslatableProjectRule INSTANCE = new TranslatableProjectRule(RelFactories.LOGICAL_BUILDER); + + /** + * Creates a TranslatableProjectRule. + * + * @param relBuilderFactory Builder for relational expressions + */ + public TranslatableProjectRule(final RelBuilderFactory relBuilderFactory) { + super(operand(LogicalProject.class, operand(TranslatableTableScan.class, none())), + relBuilderFactory, "TranslatableProjectRule"); + } + + @Override + public void onMatch(final RelOptRuleCall call) { + final LogicalProject project = call.rel(0); + final TranslatableTableScan scan = call.rel(1); + int[] fields = getProjectFields(project.getProjects()); + if (fields == null) { + return; + } + call.transformTo( + new TranslatableTableScan( + scan.getCluster(), + scan.getTable(), + scan.getTranslatableTable(), + scan.getFilters(), + fields)); + } + + private int[] getProjectFields(final List<RexNode> exps) { + final int[] fields = new int[exps.size()]; + for (int i = 0; i < exps.size(); i++) { + final RexNode exp = exps.get(i); + if (exp instanceof RexInputRef) { + fields[i] = ((RexInputRef) exp).getIndex(); + } else { + return null; + } + } + return fields; Review Comment: Please rename fields to result. ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/metadata/translatable/TranslatableProjectFilterRule.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilderFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Planner rule for pushing projections and filters into table scan. + */ +public class TranslatableProjectFilterRule extends RelOptRule { + + public static final TranslatableProjectFilterRule INSTANCE = + new TranslatableProjectFilterRule(RelFactories.LOGICAL_BUILDER); + + /** + * Creates a TranslatableProjectFilterRule. + * + * @param relBuilderFactory Builder for relational expressions + */ + public TranslatableProjectFilterRule(final RelBuilderFactory relBuilderFactory) { + super( + operand(LogicalProject.class, + operand(LogicalFilter.class, + operand(TranslatableTableScan.class, none()))), + relBuilderFactory, + "TranslatableProjectFilterRule"); + } + + @Override + public void onMatch(final RelOptRuleCall call) { + final LogicalProject project = call.rel(0); Review Comment: Please remove final modifier. ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/metadata/translatable/TranslatableTableScan.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable; + +import com.google.common.collect.ImmutableList; +import lombok.Getter; +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.codehaus.groovy.runtime.InvokerHelper; + +import java.util.Arrays; +import java.util.List; + +/** + * Relational expression representing a scan. + * Like any table scan, it serves as a leaf node of a query tree. + */ +@Getter +public class TranslatableTableScan extends TableScan implements EnumerableRel { Review Comment: Can we rename TranslatableTableScan to ShardingSphereTableScan? ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/table/TranslatableRowEnumerator.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.executor.advanced.table; + +import lombok.RequiredArgsConstructor; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.shardingsphere.infra.exception.ShardingSphereException; +import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData; +import org.apache.shardingsphere.infra.merge.result.MergedResult; + +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; + +/** + * Translatable row enumerator. + */ +@RequiredArgsConstructor +public final class TranslatableRowEnumerator implements Enumerator<Object[]> { Review Comment: Can we use the exist FilterableRowEnumerator? ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/TranslatableFederationExecutor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.executor.advanced; + +import com.google.common.base.Preconditions; +import org.apache.calcite.adapter.enumerable.EnumerableInterpretable; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.runtime.Bindable; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext; +import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext; +import org.apache.shardingsphere.infra.config.props.ConfigurationProperties; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; +import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; +import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; +import org.apache.shardingsphere.infra.federation.executor.FederationContext; +import org.apache.shardingsphere.infra.federation.executor.FederationExecutor; +import org.apache.shardingsphere.infra.federation.executor.advanced.resultset.FederationResultSet; +import org.apache.shardingsphere.infra.federation.executor.advanced.table.TranslatableTableScanExecutor; +import org.apache.shardingsphere.infra.federation.executor.advanced.table.TranslatableTableScanExecutorContext; +import org.apache.shardingsphere.infra.federation.optimizer.ShardingSphereOptimizer; +import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext; +import org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory; +import org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable.TranslatableSchema; +import org.apache.shardingsphere.infra.federation.optimizer.planner.QueryOptimizePlannerFactory; +import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData; +import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema; +import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Advanced federation executor. + */ +public final class TranslatableFederationExecutor implements FederationExecutor { Review Comment: Please merge TranslatableFederationExecutor into AdvancedFederationExecutor. ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/table/EmptyRowEnumerator.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.executor.advanced.table; + +import org.apache.calcite.linq4j.Enumerator; + +/** + * Empty row enumerator. + */ +public final class EmptyRowEnumerator implements Enumerator<Object[]> { Review Comment: Can we use the exist EmptyRowEnumerator? ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/metadata/translatable/TranslatableTableScan.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable; + +import com.google.common.collect.ImmutableList; +import lombok.Getter; +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.codehaus.groovy.runtime.InvokerHelper; + +import java.util.Arrays; +import java.util.List; + +/** + * Relational expression representing a scan. + * Like any table scan, it serves as a leaf node of a query tree. + */ +@Getter +public class TranslatableTableScan extends TableScan implements EnumerableRel { + + private final FederationTranslatableTable translatableTable; + + private final int[] fields; + + private final List<RexNode> filters; + + public TranslatableTableScan(final RelOptCluster cluster, final RelOptTable table, + final FederationTranslatableTable translatableTable, final int[] fields) { + super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), ImmutableList.of(), table); + this.translatableTable = translatableTable; + this.fields = fields; + this.filters = null; + assert translatableTable != null; + } + + public TranslatableTableScan(final RelOptCluster cluster, final RelOptTable table, + final FederationTranslatableTable translatableTable, final List<RexNode> filters, final int[] fields) { + super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), ImmutableList.of(), table); + this.translatableTable = translatableTable; + this.fields = fields; + this.filters = filters; + assert translatableTable != null; + } + + @Override + public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) { + assert inputs.isEmpty(); + return new TranslatableTableScan(getCluster(), table, translatableTable, fields); + } + + @Override + public String toString() { + if (null != filters) { + final String[] filterValues = new String[fields.length]; + addFilter(filters, filterValues); + return "TranslatableTableScan{" + + "translatableTable=" + translatableTable + + ", fields=" + Arrays.toString(fields) + + ", filters=" + Arrays.toString(filterValues) + + '}'; + } + return "TranslatableTableScan{" + + "translatableTable=" + translatableTable + + ", fields=" + Arrays.toString(fields) + + '}'; + } + + @Override + public RelWriter explainTerms(final RelWriter pw) { + if (null != filters) { + final String[] filterValues = new String[fields.length]; + addFilter(filters, filterValues); + return super.explainTerms(pw) + .item("fields", Primitive.asList(fields)) + .item("filters", Primitive.asList(filterValues)); + } + return super.explainTerms(pw) + .item("fields", Primitive.asList(fields)); + } + + @Override + public RelDataType deriveRowType() { + final List<RelDataTypeField> fieldList = table.getRowType().getFieldList(); + final RelDataTypeFactory.Builder builder = + getCluster().getTypeFactory().builder(); + for (int field : fields) { + builder.add(fieldList.get(field)); + } + return builder.build(); + } + + @Override + public void register(final RelOptPlanner planner) { + planner.addRule(TranslatableProjectFilterRule.INSTANCE); + planner.addRule(TranslatableFilterRule.INSTANCE); + planner.addRule(TranslatableProjectRule.INSTANCE); + } + + @Override + public RelOptCost computeSelfCost(final RelOptPlanner planner, + final RelMetadataQuery mq) { + // Multiply the cost by a factor that makes a scan more attractive if it Review Comment: Please remove useless java doc. ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-optimizer/src/main/java/org/apache/shardingsphere/infra/federation/optimizer/metadata/translatable/FederationTranslatableTable.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.optimizer.metadata.translatable; + +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.avatica.SqlType; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactory.Builder; +import org.apache.calcite.schema.QueryableTable; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.shardingsphere.infra.federation.optimizer.executor.TranslatableScanNodeExecutorContext; +import org.apache.shardingsphere.infra.federation.optimizer.executor.TableScanExecutor; +import org.apache.shardingsphere.infra.federation.optimizer.metadata.statistic.FederationStatistic; +import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereColumn; +import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable; + +import java.lang.reflect.Type; + +/** + * Federation Translatable table. + */ +@RequiredArgsConstructor +public final class FederationTranslatableTable extends AbstractTable implements QueryableTable, TranslatableTable { + + private final ShardingSphereTable table; + + private final TableScanExecutor executor; + + private final FederationStatistic statistic; + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + return createRelDataType(table, typeFactory); + } + + /** + * Execute filter and project when query the federation translatable table. + * + * @param root data context + * @param filterValues right value in filter condition + * @param projects fields to be projected + * @return enumerable result + */ + public Enumerable<Object[]> projectAndFilter(final DataContext root, final String[] filterValues, final int[] projects) { + return executor.execute(table, new TranslatableScanNodeExecutorContext(root, filterValues, projects)); + } + + /** + * Execute filter and project when query the federation translatable table. + * + * @param root data context + * @param projects fields to be projected + * @return enumerable result + */ + public Enumerable<Object[]> project(final DataContext root, final int[] projects) { + return executor.execute(table, new TranslatableScanNodeExecutorContext(root, null, projects)); + } + + @Override + public Expression getExpression(final SchemaPlus schema, final String tableName, + final Class clazz) { + return Schemas.tableExpression(schema, getElementType(), tableName, clazz); + } + + @Override + public Type getElementType() { + return Object[].class; + } + + @Override + public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, + final SchemaPlus schema, final String tableName) { + throw new UnsupportedOperationException(); + } + + @Override + public RelNode toRel( + final RelOptTable.ToRelContext context, + final RelOptTable relOptTable) { + // Request all fields. + final int fieldCount = relOptTable.getRowType().getFieldCount(); Review Comment: Please remove final here. ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/advanced/table/TranslatableTableScanExecutor.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.infra.federation.executor.advanced.table; + +import com.google.common.base.Strings; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.jdbc.JavaTypeFactoryImpl; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.rel2sql.RelToSqlConverter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.util.SqlString; +import org.apache.calcite.tools.RelBuilder; +import org.apache.shardingsphere.infra.binder.LogicSQL; +import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory; +import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext; +import org.apache.shardingsphere.infra.context.kernel.KernelProcessor; +import org.apache.shardingsphere.infra.database.type.DatabaseType; +import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine; +import org.apache.shardingsphere.infra.exception.ShardingSphereException; +import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup; +import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext; +import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor; +import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback; +import org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult; +import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult; +import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResultMetaData; +import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.memory.JDBCMemoryQueryResult; +import org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.stream.JDBCStreamQueryResult; +import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine; +import org.apache.shardingsphere.infra.executor.sql.process.ExecuteProcessEngine; +import org.apache.shardingsphere.infra.federation.executor.FederationContext; +import org.apache.shardingsphere.infra.federation.executor.original.SQLDialectFactory; +import org.apache.shardingsphere.infra.federation.optimizer.context.OptimizerContext; +import org.apache.shardingsphere.infra.federation.optimizer.context.planner.OptimizerPlannerContextFactory; +import org.apache.shardingsphere.infra.federation.optimizer.executor.TableScanExecutor; +import org.apache.shardingsphere.infra.federation.optimizer.executor.ScanNodeExecutorContext; +import org.apache.shardingsphere.infra.federation.optimizer.executor.TranslatableScanNodeExecutorContext; +import org.apache.shardingsphere.infra.federation.optimizer.metadata.filter.FilterableSchema; +import org.apache.shardingsphere.infra.federation.optimizer.planner.QueryOptimizePlannerFactory; +import org.apache.shardingsphere.infra.merge.MergeEngine; +import org.apache.shardingsphere.infra.merge.result.MergedResult; +import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; +import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData; +import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema; +import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable; +import org.apache.shardingsphere.infra.parser.sql.SQLStatementParserEngine; +import org.apache.shardingsphere.infra.session.ConnectionContext; +import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Translatable table scan executor. + */ +@RequiredArgsConstructor +public final class TranslatableTableScanExecutor implements TableScanExecutor { + + private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine; + + private final JDBCExecutor jdbcExecutor; + + private final JDBCExecutorCallback<? extends ExecuteResult> callback; + + private final OptimizerContext optimizerContext; + + private final ShardingSphereRuleMetaData globalRuleMetaData; + + private final TranslatableTableScanExecutorContext executorContext; + + private final EventBusContext eventBusContext; + + @Override + public Enumerable<Object[]> execute(final ShardingSphereTable table, final ScanNodeExecutorContext scanContext) { + String databaseName = executorContext.getDatabaseName(); + String schemaName = executorContext.getSchemaName(); + DatabaseType databaseType = DatabaseTypeEngine.getTrunkDatabaseType(optimizerContext.getParserContexts().get(databaseName).getDatabaseType().getType()); + SqlString sqlString = createSQLString(table, (TranslatableScanNodeExecutorContext) scanContext, SQLDialectFactory.getSQLDialect(databaseType)); + // TODO replace sql parse with sql convert + FederationContext federationContext = executorContext.getFederationContext(); + LogicSQL logicSQL = createLogicSQL(federationContext.getDatabases(), sqlString, databaseType); + ShardingSphereDatabase database = federationContext.getDatabases().get(databaseName.toLowerCase()); + ExecutionContext context = new KernelProcessor().generateExecutionContext(logicSQL, database, globalRuleMetaData, executorContext.getProps(), new ConnectionContext()); + if (federationContext.isPreview() || databaseType.getSystemSchemas().contains(schemaName)) { + federationContext.getExecutionUnits().addAll(context.getExecutionUnits()); + return createEmptyEnumerable(); + } + return execute(databaseType, logicSQL, database, context); + } + + private AbstractEnumerable<Object[]> execute(final DatabaseType databaseType, final LogicSQL logicSQL, final ShardingSphereDatabase database, final ExecutionContext context) { + try { + ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = prepareEngine.prepare(context.getRouteContext(), context.getExecutionUnits()); + setParameters(executionGroupContext.getInputGroups()); + ExecuteProcessEngine.initialize(context.getLogicSQL(), executionGroupContext, eventBusContext); + List<QueryResult> queryResults = execute(executionGroupContext, databaseType); + ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(), eventBusContext); + MergeEngine mergeEngine = new MergeEngine(database, executorContext.getProps(), new ConnectionContext()); + MergedResult mergedResult = mergeEngine.merge(queryResults, logicSQL.getSqlStatementContext()); + Collection<Statement> statements = getStatements(executionGroupContext.getInputGroups()); + return createEnumerable(mergedResult, queryResults.get(0).getMetaData(), statements); + } catch (final SQLException ex) { + throw new ShardingSphereException(ex); + } finally { + ExecuteProcessEngine.clean(); + } + } + + private List<QueryResult> execute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final DatabaseType databaseType) throws SQLException { + Collection<QueryResult> queryResults = jdbcExecutor.execute(executionGroupContext, callback).stream().map(each -> (QueryResult) each).collect(Collectors.toList()); + List<QueryResult> result = new LinkedList<>(); + for (QueryResult each : queryResults) { + QueryResult queryResult = each instanceof JDBCStreamQueryResult + ? new JDBCMemoryQueryResult(((JDBCStreamQueryResult) each).getResultSet(), databaseType) + : each; + result.add(queryResult); + } + return result; + } + + private Collection<Statement> getStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) { + Collection<Statement> result = new LinkedList<>(); + for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) { + for (JDBCExecutionUnit executionUnit : each.getInputs()) { + result.add(executionUnit.getStorageResource()); + } + } + return result; + } + + private SqlString createSQLString(final ShardingSphereTable table, final TranslatableScanNodeExecutorContext scanContext, final SqlDialect sqlDialect) { + return new RelToSqlConverter(sqlDialect).visitRoot(createRelNode(table, scanContext)).asStatement().toSqlString(sqlDialect); + } + + private void setParameters(final Collection<ExecutionGroup<JDBCExecutionUnit>> inputGroups) { + for (ExecutionGroup<JDBCExecutionUnit> each : inputGroups) { + for (JDBCExecutionUnit executionUnit : each.getInputs()) { + if (!(executionUnit.getStorageResource() instanceof PreparedStatement)) { + continue; + } + setParameters((PreparedStatement) executionUnit.getStorageResource(), executionUnit.getExecutionUnit().getSqlUnit().getParameters()); + } + } + } + + @SneakyThrows(SQLException.class) + private void setParameters(final PreparedStatement preparedStatement, final List<Object> parameters) { + for (int i = 0; i < parameters.size(); i++) { + Object parameter = parameters.get(i); + preparedStatement.setObject(i + 1, parameter); + } + } + + private RelNode createRelNode(final ShardingSphereTable table, final TranslatableScanNodeExecutorContext scanContext) { + String databaseName = executorContext.getDatabaseName(); + String schemaName = executorContext.getSchemaName(); + CalciteConnectionConfig connectionConfig = new CalciteConnectionConfigImpl(OptimizerPlannerContextFactory.createConnectionProperties()); + ShardingSphereSchema schema = executorContext.getFederationContext().getDatabases().get(databaseName).getSchema(schemaName); + CalciteCatalogReader catalogReader = OptimizerPlannerContextFactory.createCatalogReader(schemaName, + new FilterableSchema(schemaName, schema, null), new JavaTypeFactoryImpl(), connectionConfig); + RelOptCluster relOptCluster = RelOptCluster.create(QueryOptimizePlannerFactory.createVolcanoPlanner(), new RexBuilder(new JavaTypeFactoryImpl())); + RelBuilder builder = RelFactories.LOGICAL_BUILDER.create(relOptCluster, catalogReader).scan(table.getName()); + if (null != scanContext.getFilterValues()) { + builder.filter(createFilters(scanContext.getFilterValues(), builder, table.getColumnNames())); + } + if (null != scanContext.getProjects()) { + builder.project(createProjections(scanContext.getProjects(), builder, table.getColumnNames())); + } + return builder.build(); + } + + private Collection<RexNode> createFilters(final String[] filterValues, final RelBuilder builder, final List<String> columnNames) { Review Comment: Can we move this logic to the entrance, and remove TranslatableTableScanExecutor? ########## shardingsphere-infra/shardingsphere-infra-federation/shardingsphere-infra-federation-executor/src/main/java/org/apache/shardingsphere/infra/federation/executor/FederationExecutorFactory.java: ########## @@ -34,20 +34,20 @@ public final class FederationExecutorFactory { /** * Create new instance of federation executor factory. - * - * @param databaseName database name - * @param schemaName schema name + * + * @param databaseName database name Review Comment: Please import default IDEA settings to keep same code style. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
