godfreyhe commented on code in PR #20462:
URL: https://github.com/apache/flink/pull/20462#discussion_r939532490


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableIntList;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Planner utils for Dynamic partition Pruning. */
+public class DynamicPartitionPruningUtils {
+
+    /**
+     * For the input join node, judge whether the join left side and join 
right side meet the
+     * requirements of dynamic partition pruning. If meets the 
requirements,return true.
+     */
+    public static boolean supportDynamicPartitionPruning(Join join) {

Review Comment:
   Can DynamicPartitionPruningRule#doMatches be replaced with this method ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableIntList;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Planner utils for Dynamic partition Pruning. */
+public class DynamicPartitionPruningUtils {
+
+    /**
+     * For the input join node, judge whether the join left side and join 
right side meet the
+     * requirements of dynamic partition pruning. If meets the 
requirements,return true.
+     */
+    public static boolean supportDynamicPartitionPruning(Join join) {
+        if (!ShortcutUtils.unwrapContext(join)
+                .getTableConfig()
+                
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+            return false;
+        }
+        // Now dynamic partition pruning supports left/right join, inner and 
semi join. but now semi
+        // join can not join reorder.
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
+            return false;
+        }
+
+        JoinInfo joinInfo = join.analyzeCondition();
+        if (joinInfo.leftKeys.isEmpty()) {
+            return false;
+        }
+        RelNode left = join.getLeft();
+        RelNode right = join.getRight();
+
+        // TODO Now fact side and dim side don't support many complex 
patterns, like join inside
+        // fact/dim side, agg inside fact/dim side etc. which will support 
next.
+        ImmutableIntList leftPartitionKeys =
+                extractPartitionKeysFromFactSide(left, joinInfo.leftKeys);
+        if (!leftPartitionKeys.isEmpty()) {
+            boolean rightIsDim =
+                    isDimSide(
+                            right,
+                            getDimSidePartitionKeys(
+                                    joinInfo.leftKeys, joinInfo.rightKeys, 
leftPartitionKeys));
+            if (rightIsDim) {
+                return true;
+            }
+        }
+
+        ImmutableIntList rightPartitionKeys =
+                extractPartitionKeysFromFactSide(right, joinInfo.rightKeys);
+        if (!rightPartitionKeys.isEmpty()) {
+            return isDimSide(
+                    left,
+                    getDimSidePartitionKeys(
+                            joinInfo.rightKeys, joinInfo.leftKeys, 
rightPartitionKeys));
+        }
+        return false;
+    }
+
+    /**
+     * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
+     * changed, this RelNode is not dim side.
+     */
+    public static boolean isDimSide(RelNode rel, @Nullable ImmutableIntList 
joinKeys) {

Review Comment:
   private
   
   joinKeys should not be null in current  situation



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdRowCount.scala:
##########
@@ -335,10 +336,19 @@ class FlinkRelMdRowCount private extends 
MetadataHandler[BuiltInMetadata.RowCoun
       fmq.getSelectivity(joinWithOnlyEquiPred, nonEquiPred)
     }
 
+    /** Judge */
+    val dynamicPartitionPruningFactor =

Review Comment:
   add some comments to explain why we use factor here



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableIntList;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Planner utils for Dynamic partition Pruning. */
+public class DynamicPartitionPruningUtils {
+
+    /**
+     * For the input join node, judge whether the join left side and join 
right side meet the
+     * requirements of dynamic partition pruning. If meets the 
requirements,return true.
+     */
+    public static boolean supportDynamicPartitionPruning(Join join) {
+        if (!ShortcutUtils.unwrapContext(join)
+                .getTableConfig()
+                
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+            return false;
+        }
+        // Now dynamic partition pruning supports left/right join, inner and 
semi join. but now semi
+        // join can not join reorder.
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
+            return false;
+        }
+
+        JoinInfo joinInfo = join.analyzeCondition();
+        if (joinInfo.leftKeys.isEmpty()) {
+            return false;
+        }
+        RelNode left = join.getLeft();
+        RelNode right = join.getRight();
+
+        // TODO Now fact side and dim side don't support many complex 
patterns, like join inside
+        // fact/dim side, agg inside fact/dim side etc. which will support 
next.
+        ImmutableIntList leftPartitionKeys =
+                extractPartitionKeysFromFactSide(left, joinInfo.leftKeys);
+        if (!leftPartitionKeys.isEmpty()) {
+            boolean rightIsDim =
+                    isDimSide(
+                            right,
+                            getDimSidePartitionKeys(
+                                    joinInfo.leftKeys, joinInfo.rightKeys, 
leftPartitionKeys));
+            if (rightIsDim) {
+                return true;
+            }
+        }
+
+        ImmutableIntList rightPartitionKeys =
+                extractPartitionKeysFromFactSide(right, joinInfo.rightKeys);
+        if (!rightPartitionKeys.isEmpty()) {
+            return isDimSide(
+                    left,
+                    getDimSidePartitionKeys(
+                            joinInfo.rightKeys, joinInfo.leftKeys, 
rightPartitionKeys));
+        }
+        return false;
+    }
+
+    /**
+     * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
+     * changed, this RelNode is not dim side.
+     */
+    public static boolean isDimSide(RelNode rel, @Nullable ImmutableIntList 
joinKeys) {
+        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
+        visitDimSide(rel, dimSideFactors, joinKeys);
+        return dimSideFactors.isDimSide();
+    }
+
+    /**
+     * Visit dim side to judge whether dim side has filter condition and 
whether dim side's source
+     * table scan is non partitioned scan.
+     */
+    public static void visitDimSide(
+            RelNode rel, DppDimSideFactors dimSideFactors, @Nullable 
ImmutableIntList joinKeys) {
+        // TODO Let visitDimSide more efficient and more accurate. Like a 
filter on dim table or a
+        // filter for the partition field on fact table.
+        if (rel instanceof TableScan) {
+            TableScan scan = (TableScan) rel;
+            TableSourceTable table = 
scan.getTable().unwrap(TableSourceTable.class);
+            if (table == null) {
+                return;
+            }
+            if (!dimSideFactors.hasFilter
+                    && table.abilitySpecs() != null
+                    && table.abilitySpecs().length != 0) {
+                for (SourceAbilitySpec spec : table.abilitySpecs()) {
+                    if (spec instanceof FilterPushDownSpec) {
+                        List<RexNode> predicates = ((FilterPushDownSpec) 
spec).getPredicates();
+                        for (RexNode predicate : predicates) {
+                            if (isSuitableFilter(predicate)) {
+                                dimSideFactors.hasFilter = true;
+                            }
+                        }
+                    }
+                }
+            }
+            CatalogTable catalogTable = 
table.contextResolvedTable().getTable();
+            dimSideFactors.hasNonPartitionedScan = 
!catalogTable.isPartitioned();
+        } else if (rel instanceof HepRelVertex) {
+            visitDimSide(((HepRelVertex) rel).getCurrentRel(), dimSideFactors, 
joinKeys);
+        } else if (rel instanceof Exchange) {
+            visitDimSide(rel.getInput(0), dimSideFactors, joinKeys);
+        } else if (rel instanceof Project) {
+            // joinKeys is null means need not consider join keys.
+            if (joinKeys != null) {
+                List<RexNode> projects = ((Project) rel).getProjects();
+                ImmutableIntList inputJoinKeys = getInputIndices(projects, 
joinKeys);
+                if (inputJoinKeys.isEmpty()) {
+                    dimSideFactors.isSuitableJoinKey = false;
+                }
+            }
+
+            visitDimSide(rel.getInput(0), dimSideFactors, joinKeys);

Review Comment:
   the joinKeys should be inputJoinKeys, consider the following situation: 
Project on Calc with field index chagned



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableIntList;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Planner utils for Dynamic partition Pruning. */
+public class DynamicPartitionPruningUtils {
+
+    /**
+     * For the input join node, judge whether the join left side and join 
right side meet the
+     * requirements of dynamic partition pruning. If meets the 
requirements,return true.
+     */
+    public static boolean supportDynamicPartitionPruning(Join join) {
+        if (!ShortcutUtils.unwrapContext(join)
+                .getTableConfig()
+                
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+            return false;
+        }
+        // Now dynamic partition pruning supports left/right join, inner and 
semi join. but now semi
+        // join can not join reorder.
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
+            return false;
+        }
+
+        JoinInfo joinInfo = join.analyzeCondition();
+        if (joinInfo.leftKeys.isEmpty()) {
+            return false;
+        }
+        RelNode left = join.getLeft();
+        RelNode right = join.getRight();
+
+        // TODO Now fact side and dim side don't support many complex 
patterns, like join inside
+        // fact/dim side, agg inside fact/dim side etc. which will support 
next.
+        ImmutableIntList leftPartitionKeys =
+                extractPartitionKeysFromFactSide(left, joinInfo.leftKeys);
+        if (!leftPartitionKeys.isEmpty()) {
+            boolean rightIsDim =
+                    isDimSide(
+                            right,
+                            getDimSidePartitionKeys(
+                                    joinInfo.leftKeys, joinInfo.rightKeys, 
leftPartitionKeys));
+            if (rightIsDim) {
+                return true;
+            }
+        }
+
+        ImmutableIntList rightPartitionKeys =
+                extractPartitionKeysFromFactSide(right, joinInfo.rightKeys);
+        if (!rightPartitionKeys.isEmpty()) {
+            return isDimSide(
+                    left,
+                    getDimSidePartitionKeys(
+                            joinInfo.rightKeys, joinInfo.leftKeys, 
rightPartitionKeys));
+        }
+        return false;
+    }
+
+    /**
+     * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
+     * changed, this RelNode is not dim side.
+     */
+    public static boolean isDimSide(RelNode rel, @Nullable ImmutableIntList 
joinKeys) {
+        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
+        visitDimSide(rel, dimSideFactors, joinKeys);
+        return dimSideFactors.isDimSide();
+    }
+
+    /**
+     * Visit dim side to judge whether dim side has filter condition and 
whether dim side's source
+     * table scan is non partitioned scan.
+     */
+    public static void visitDimSide(

Review Comment:
   private



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.catalog.CatalogTable;
+import 
org.apache.flink.table.connector.source.abilities.SupportsDynamicFiltering;
+import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec;
+import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableIntList;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Planner utils for Dynamic partition Pruning. */
+public class DynamicPartitionPruningUtils {
+
+    /**
+     * For the input join node, judge whether the join left side and join 
right side meet the
+     * requirements of dynamic partition pruning. If meets the 
requirements,return true.
+     */
+    public static boolean supportDynamicPartitionPruning(Join join) {
+        if (!ShortcutUtils.unwrapContext(join)
+                .getTableConfig()
+                
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
+            return false;
+        }
+        // Now dynamic partition pruning supports left/right join, inner and 
semi join. but now semi
+        // join can not join reorder.
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
+            return false;
+        }
+
+        JoinInfo joinInfo = join.analyzeCondition();
+        if (joinInfo.leftKeys.isEmpty()) {
+            return false;
+        }
+        RelNode left = join.getLeft();
+        RelNode right = join.getRight();
+
+        // TODO Now fact side and dim side don't support many complex 
patterns, like join inside
+        // fact/dim side, agg inside fact/dim side etc. which will support 
next.
+        ImmutableIntList leftPartitionKeys =
+                extractPartitionKeysFromFactSide(left, joinInfo.leftKeys);
+        if (!leftPartitionKeys.isEmpty()) {
+            boolean rightIsDim =
+                    isDimSide(
+                            right,
+                            getDimSidePartitionKeys(
+                                    joinInfo.leftKeys, joinInfo.rightKeys, 
leftPartitionKeys));
+            if (rightIsDim) {
+                return true;
+            }
+        }
+
+        ImmutableIntList rightPartitionKeys =
+                extractPartitionKeysFromFactSide(right, joinInfo.rightKeys);
+        if (!rightPartitionKeys.isEmpty()) {
+            return isDimSide(
+                    left,
+                    getDimSidePartitionKeys(
+                            joinInfo.rightKeys, joinInfo.leftKeys, 
rightPartitionKeys));
+        }
+        return false;
+    }
+
+    /**
+     * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
+     * changed, this RelNode is not dim side.
+     */
+    public static boolean isDimSide(RelNode rel, @Nullable ImmutableIntList 
joinKeys) {
+        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
+        visitDimSide(rel, dimSideFactors, joinKeys);
+        return dimSideFactors.isDimSide();
+    }
+
+    /**
+     * Visit dim side to judge whether dim side has filter condition and 
whether dim side's source
+     * table scan is non partitioned scan.
+     */
+    public static void visitDimSide(
+            RelNode rel, DppDimSideFactors dimSideFactors, @Nullable 
ImmutableIntList joinKeys) {
+        // TODO Let visitDimSide more efficient and more accurate. Like a 
filter on dim table or a
+        // filter for the partition field on fact table.
+        if (rel instanceof TableScan) {
+            TableScan scan = (TableScan) rel;
+            TableSourceTable table = 
scan.getTable().unwrap(TableSourceTable.class);
+            if (table == null) {
+                return;
+            }
+            if (!dimSideFactors.hasFilter
+                    && table.abilitySpecs() != null
+                    && table.abilitySpecs().length != 0) {
+                for (SourceAbilitySpec spec : table.abilitySpecs()) {
+                    if (spec instanceof FilterPushDownSpec) {
+                        List<RexNode> predicates = ((FilterPushDownSpec) 
spec).getPredicates();
+                        for (RexNode predicate : predicates) {
+                            if (isSuitableFilter(predicate)) {
+                                dimSideFactors.hasFilter = true;
+                            }
+                        }
+                    }
+                }
+            }
+            CatalogTable catalogTable = 
table.contextResolvedTable().getTable();
+            dimSideFactors.hasNonPartitionedScan = 
!catalogTable.isPartitioned();
+        } else if (rel instanceof HepRelVertex) {
+            visitDimSide(((HepRelVertex) rel).getCurrentRel(), dimSideFactors, 
joinKeys);
+        } else if (rel instanceof Exchange) {
+            visitDimSide(rel.getInput(0), dimSideFactors, joinKeys);
+        } else if (rel instanceof Project) {
+            // joinKeys is null means need not consider join keys.
+            if (joinKeys != null) {
+                List<RexNode> projects = ((Project) rel).getProjects();
+                ImmutableIntList inputJoinKeys = getInputIndices(projects, 
joinKeys);
+                if (inputJoinKeys.isEmpty()) {
+                    dimSideFactors.isSuitableJoinKey = false;
+                }
+            }
+
+            visitDimSide(rel.getInput(0), dimSideFactors, joinKeys);
+        } else if (rel instanceof Calc) {
+            Calc calc = (Calc) rel;
+            // joinKeys is null means need not consider join keys.
+            if (joinKeys != null) {
+                List<RexNode> projects =
+                        calc.getProgram().getProjectList().stream()
+                                .map(p -> calc.getProgram().expandLocalRef(p))
+                                .collect(Collectors.toList());
+                ImmutableIntList inputJoinKeys = getInputIndices(projects, 
joinKeys);
+                if (inputJoinKeys.isEmpty()) {
+                    dimSideFactors.isSuitableJoinKey = false;
+                }
+            }
+
+            RexProgram origProgram = ((Calc) rel).getProgram();
+            if (origProgram.getCondition() != null
+                    && 
isSuitableFilter(origProgram.expandLocalRef(origProgram.getCondition()))) {
+                dimSideFactors.hasFilter = true;
+            }
+            visitDimSide(rel.getInput(0), dimSideFactors, joinKeys);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to