swuferhong commented on code in PR #21489:
URL: https://github.com/apache/flink/pull/21489#discussion_r1064259324


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -60,318 +69,470 @@
 public class DynamicPartitionPruningUtils {
 
     /**
-     * For the input join node, judge whether the join left side and join 
right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right 
join is not clear.
+     * Judge whether the input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
+     * changed, this RelNode is not dim side.
      */
-    public static boolean supportDynamicPartitionPruning(Join join) {
-        return supportDynamicPartitionPruning(join, true)
-                || supportDynamicPartitionPruning(join, false);
+    public static boolean isDppDimSide(RelNode rel) {
+        DppDimSideChecker dimSideChecker = new DppDimSideChecker(rel);
+        return dimSideChecker.isDppDimSide();
     }
 
     /**
-     * For the input join node, judge whether the join left side and join 
right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right 
is clear. If meets the
-     * requirements, return true.
+     * Judge whether the input RelNode can be converted to the dpp fact side. 
If the input RelNode
+     * can be converted, this method will return the converted fact side whose 
partitioned table
+     * source will be converted to {@link 
BatchPhysicalDynamicFilteringTableSourceScan}, If not,
+     * this method will return the origin RelNode.
      */
-    public static boolean supportDynamicPartitionPruning(Join join, boolean 
factInLeft) {
-        if (!ShortcutUtils.unwrapContext(join)
-                .getTableConfig()
-                
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
-            return false;
-        }
-        // Now dynamic partition pruning supports left/right join, inner and 
semi join. but now semi
-        // join can not join reorder.
-        if (join.getJoinType() == JoinRelType.LEFT) {
-            if (factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() == JoinRelType.RIGHT) {
-            if (!factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() != JoinRelType.INNER
-                && join.getJoinType() != JoinRelType.SEMI) {
+    public static Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide(
+            RelNode rel,
+            ImmutableIntList joinKeys,
+            RelNode dimSide,
+            ImmutableIntList dimSideJoinKey) {
+        DppFactSideChecker dppFactSideChecker =
+                new DppFactSideChecker(rel, joinKeys, dimSide, dimSideJoinKey);
+        return dppFactSideChecker.canConvertAndConvertDppFactSide();
+    }
+
+    /** Judge whether the join node is suitable one for dpp pattern. */
+    public static boolean isSuitableJoin(Join join) {
+        // Now dynamic partition pruning supports left/right join, inner and 
semi
+        // join. but now semi join can not join reorder.
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.SEMI
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
             return false;
         }
 
         JoinInfo joinInfo = join.analyzeCondition();
-        if (joinInfo.leftKeys.isEmpty()) {
-            return false;
-        }
-        RelNode left = join.getLeft();
-        RelNode right = join.getRight();
-
-        // TODO Now fact side and dim side don't support many complex 
patterns, like join inside
-        // fact/dim side, agg inside fact/dim side etc. which will support 
next.
-        return factInLeft
-                ? isDynamicPartitionPruningPattern(left, right, 
joinInfo.leftKeys)
-                : isDynamicPartitionPruningPattern(right, left, 
joinInfo.rightKeys);
+        return !joinInfo.leftKeys.isEmpty();
     }
 
-    private static boolean isDynamicPartitionPruningPattern(
-            RelNode factSide, RelNode dimSide, ImmutableIntList 
factSideJoinKey) {
-        return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey);
-    }
+    /** This class is used to check whether the relNode is dpp dim side. */
+    private static class DppDimSideChecker {
+        private final RelNode relNode;
+        private boolean hasFilter;
+        private boolean hasPartitionedScan;
+        private final List<ContextResolvedTable> tables = new ArrayList<>();
 
-    /** make a dpp fact side factor to recurrence in fact side. */
-    private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
-        DppFactSideFactors factSideFactors = new DppFactSideFactors();
-        visitFactSide(rel, factSideFactors, joinKeys);
-        return factSideFactors.isFactSide();
-    }
+        public DppDimSideChecker(RelNode relNode) {
+            this.relNode = relNode;
+        }
 
-    /**
-     * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
-     * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
-     * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
-     * changed, this RelNode is not dim side.
-     */
-    private static boolean isDimSide(RelNode rel) {
-        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
-        visitDimSide(rel, dimSideFactors);
-        return dimSideFactors.isDimSide();
-    }
+        public boolean isDppDimSide() {
+            visitDimSide(this.relNode);
+            return hasFilter && !hasPartitionedScan && tables.size() == 1;
+        }
 
-    /**
-     * Visit fact side to judge whether fact side has partition table, 
partition table source meets
-     * the condition of dpp table source and dynamic filtering keys changed in 
fact side.
-     */
-    private static void visitFactSide(
-            RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList 
joinKeys) {
-        if (rel instanceof TableScan) {
-            TableScan scan = (TableScan) rel;
-            if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
-                // rule applied
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
-            TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
-            if (tableSourceTable == null) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
-            CatalogTable catalogTable = 
tableSourceTable.contextResolvedTable().getResolvedTable();
-            List<String> partitionKeys = catalogTable.getPartitionKeys();
-            if (partitionKeys.isEmpty()) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
-            DynamicTableSource tableSource = tableSourceTable.tableSource();
-            if (!(tableSource instanceof SupportsDynamicFiltering)
-                    || !(tableSource instanceof ScanTableSource)) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
-            if (!isNewSource((ScanTableSource) tableSource)) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
-            }
+        /**
+         * Visit dim side to judge whether dim side has filter condition and 
whether dim side's
+         * source table scan is non partitioned scan.
+         */
+        private void visitDimSide(RelNode rel) {
+            // TODO Let visitDimSide more efficient and more accurate. Like a 
filter on dim table or
+            // a filter for the partition field on fact table.
+            if (rel instanceof TableScan) {
+                TableScan scan = (TableScan) rel;
+                TableSourceTable table = 
scan.getTable().unwrap(TableSourceTable.class);
+                if (table == null) {
+                    return;
+                }
+                if (!hasFilter
+                        && table.abilitySpecs() != null
+                        && table.abilitySpecs().length != 0) {
+                    for (SourceAbilitySpec spec : table.abilitySpecs()) {
+                        if (spec instanceof FilterPushDownSpec) {
+                            List<RexNode> predicates = ((FilterPushDownSpec) 
spec).getPredicates();
+                            for (RexNode predicate : predicates) {
+                                if (isSuitableFilter(predicate)) {
+                                    hasFilter = true;
+                                }
+                            }
+                        }
+                    }
+                }
+                CatalogTable catalogTable = 
table.contextResolvedTable().getResolvedTable();
+                if (catalogTable.isPartitioned()) {
+                    hasPartitionedScan = true;
+                    return;
+                }
 
-            List<String> candidateFields =
-                    joinKeys.stream()
-                            .map(i -> scan.getRowType().getFieldNames().get(i))
-                            .collect(Collectors.toList());
-            if (candidateFields.isEmpty()) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                // To ensure there is only one source on the dim side.
+                setTables(table.contextResolvedTable());
+            } else if (rel instanceof HepRelVertex) {
+                visitDimSide(((HepRelVertex) rel).getCurrentRel());
+            } else if (rel instanceof Exchange || rel instanceof Project) {
+                visitDimSide(rel.getInput(0));
+            } else if (rel instanceof Calc) {
+                RexProgram origProgram = ((Calc) rel).getProgram();
+                if (origProgram.getCondition() != null
+                        && isSuitableFilter(
+                                
origProgram.expandLocalRef(origProgram.getCondition()))) {
+                    hasFilter = true;
+                }
+                visitDimSide(rel.getInput(0));
+            } else if (rel instanceof Filter) {
+                if (isSuitableFilter(((Filter) rel).getCondition())) {
+                    hasFilter = true;
+                }
+                visitDimSide(rel.getInput(0));
+            } else if (rel instanceof Join) {
+                Join join = (Join) rel;
+                visitDimSide(join.getLeft());
+                visitDimSide(join.getRight());
+            } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+                visitDimSide(((BatchPhysicalGroupAggregateBase) 
rel).getInput());
+            } else if (rel instanceof Union) {
+                Union union = (Union) rel;
+                for (RelNode input : union.getInputs()) {
+                    visitDimSide(input);
+                }
             }
+        }
 
-            factSideFactors.isSuitableFactScanSource =
-                    !getSuitableDynamicFilteringFieldsInFactSide(tableSource, 
candidateFields)
-                            .isEmpty();
-        } else if (rel instanceof HepRelVertex) {
-            visitFactSide(((HepRelVertex) rel).getCurrentRel(), 
factSideFactors, joinKeys);
-        } else if (rel instanceof Exchange || rel instanceof Filter) {
-            visitFactSide(rel.getInput(0), factSideFactors, joinKeys);
-        } else if (rel instanceof Project) {
-            List<RexNode> projects = ((Project) rel).getProjects();
-            ImmutableIntList inputJoinKeys = getInputIndices(projects, 
joinKeys);
-            if (inputJoinKeys.isEmpty()) {
-                factSideFactors.isSuitableJoinKey = false;
-                return;
+        /**
+         * Not all filter condition suitable for using to filter partitions by 
dynamic partition
+         * pruning rules. For example, NOT NULL can only filter one default 
partition which have a
+         * small impact on filtering data.
+         */
+        private static boolean isSuitableFilter(RexNode filterCondition) {
+            switch (filterCondition.getKind()) {
+                case AND:
+                    List<RexNode> conjunctions = 
RelOptUtil.conjunctions(filterCondition);
+                    return isSuitableFilter(conjunctions.get(0))
+                            || isSuitableFilter(conjunctions.get(1));
+                case OR:
+                    List<RexNode> disjunctions = 
RelOptUtil.disjunctions(filterCondition);
+                    return isSuitableFilter(disjunctions.get(0))
+                            && isSuitableFilter(disjunctions.get(1));
+                case NOT:
+                    return isSuitableFilter(((RexCall) 
filterCondition).operands.get(0));
+                case EQUALS:
+                case GREATER_THAN:
+                case GREATER_THAN_OR_EQUAL:
+                case LESS_THAN:
+                case LESS_THAN_OR_EQUAL:
+                case NOT_EQUALS:
+                case IN:
+                case LIKE:
+                case CONTAINS:
+                case SEARCH:
+                case IS_FALSE:
+                case IS_NOT_FALSE:
+                case IS_NOT_TRUE:
+                case IS_TRUE:
+                    // TODO adding more suitable filters which can filter 
enough partitions after
+                    // using this filter in dynamic partition pruning.
+                    return true;
+                default:
+                    return false;
             }
+        }
 
-            visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
-        } else if (rel instanceof Calc) {
-            Calc calc = (Calc) rel;
-            RexProgram program = calc.getProgram();
-            List<RexNode> projects =
-                    program.getProjectList().stream()
-                            .map(program::expandLocalRef)
-                            .collect(Collectors.toList());
-            ImmutableIntList inputJoinKeys = getInputIndices(projects, 
joinKeys);
-            if (inputJoinKeys.isEmpty()) {
-                factSideFactors.isSuitableJoinKey = false;
-                return;
+        private void setTables(ContextResolvedTable catalogTable) {
+            if (tables.size() == 0) {
+                tables.add(catalogTable);
+            } else {
+                for (ContextResolvedTable thisTable : new ArrayList<>(tables)) 
{

Review Comment:
   > nit: why we need new ArrayList instance here?
   
   The list variable `tables` have an `add` operation in the for loop. If we 
don't new one new ArrayList, a concurrent modification exception will be thrown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to