swuferhong commented on code in PR #21489:
URL: https://github.com/apache/flink/pull/21489#discussion_r1057114326


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -60,138 +68,153 @@
 public class DynamicPartitionPruningUtils {
 
     /**
-     * For the input join node, judge whether the join left side and join 
right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right 
join is not clear.
+     * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
+     * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
+     * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
+     * changed, this RelNode is not dim side.
      */
-    public static boolean supportDynamicPartitionPruning(Join join) {
-        return supportDynamicPartitionPruning(join, true)
-                || supportDynamicPartitionPruning(join, false);
+    public static boolean isDimSide(RelNode rel) {
+        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
+        visitDimSide(rel, dimSideFactors);
+        return dimSideFactors.isDimSide();
     }
 
-    /**
-     * For the input join node, judge whether the join left side and join 
right side meet the
-     * requirements of dynamic partition pruning. Fact side in left or right 
is clear. If meets the
-     * requirements, return true.
-     */
-    public static boolean supportDynamicPartitionPruning(Join join, boolean 
factInLeft) {
-        if (!ShortcutUtils.unwrapContext(join)
-                .getTableConfig()
-                
.get(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED)) {
-            return false;
-        }
-        // Now dynamic partition pruning supports left/right join, inner and 
semi join. but now semi
-        // join can not join reorder.
-        if (join.getJoinType() == JoinRelType.LEFT) {
-            if (factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() == JoinRelType.RIGHT) {
-            if (!factInLeft) {
-                return false;
-            }
-        } else if (join.getJoinType() != JoinRelType.INNER
-                && join.getJoinType() != JoinRelType.SEMI) {
-            return false;
+    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(
+            DynamicTableSource tableSource, List<String> candidateFields) {
+        List<String> acceptedFilterFields =
+                ((SupportsDynamicFiltering) 
tableSource).listAcceptedFilterFields();
+        if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
+            return new ArrayList<>();
         }
 
-        JoinInfo joinInfo = join.analyzeCondition();
-        if (joinInfo.leftKeys.isEmpty()) {
-            return false;
+        List<String> suitableFields = new ArrayList<>();
+        // If candidateField not in acceptedFilterFields means dpp rule will 
not be matched,
+        // because we can not prune any partitions according to non-accepted 
filter fields
+        // provided by partition table source.
+        for (String candidateField : candidateFields) {
+            if (acceptedFilterFields.contains(candidateField)) {
+                suitableFields.add(candidateField);
+            }
         }
-        RelNode left = join.getLeft();
-        RelNode right = join.getRight();
-
-        // TODO Now fact side and dim side don't support many complex 
patterns, like join inside
-        // fact/dim side, agg inside fact/dim side etc. which will support 
next.
-        return factInLeft
-                ? isDynamicPartitionPruningPattern(left, right, 
joinInfo.leftKeys)
-                : isDynamicPartitionPruningPattern(right, left, 
joinInfo.rightKeys);
-    }
 
-    private static boolean isDynamicPartitionPruningPattern(
-            RelNode factSide, RelNode dimSide, ImmutableIntList 
factSideJoinKey) {
-        return isDimSide(dimSide) && isFactSide(factSide, factSideJoinKey);
+        return suitableFields;
     }
 
-    /** make a dpp fact side factor to recurrence in fact side. */
-    private static boolean isFactSide(RelNode rel, ImmutableIntList joinKeys) {
+    public static Tuple2<Boolean, RelNode> canConvertAndConvertDppFactSide(
+            RelNode rel,
+            ImmutableIntList joinKeys,
+            RelNode dimSide,
+            ImmutableIntList dimSideJoinKey,
+            Join join) {
         DppFactSideFactors factSideFactors = new DppFactSideFactors();
-        visitFactSide(rel, factSideFactors, joinKeys);
-        return factSideFactors.isFactSide();
+        RelNode newRel =
+                convertDppFactSide(rel, joinKeys, dimSide, dimSideJoinKey, 
factSideFactors, join);
+        return Tuple2.of(factSideFactors.isChanged, newRel);
     }
 
-    /**
-     * Judge whether input RelNode meets the conditions of dimSide. If 
joinKeys is null means we
-     * need not consider the join keys in dim side, which already deal by 
dynamic partition pruning
-     * rule. If joinKeys not null means we need to judge whether joinKeys 
changed in dim side, if
-     * changed, this RelNode is not dim side.
-     */
-    private static boolean isDimSide(RelNode rel) {
-        DppDimSideFactors dimSideFactors = new DppDimSideFactors();
-        visitDimSide(rel, dimSideFactors);
-        return dimSideFactors.isDimSide();
-    }
-
-    /**
-     * Visit fact side to judge whether fact side has partition table, 
partition table source meets
-     * the condition of dpp table source and dynamic filtering keys changed in 
fact side.
-     */
-    private static void visitFactSide(
-            RelNode rel, DppFactSideFactors factSideFactors, ImmutableIntList 
joinKeys) {
+    public static RelNode convertDppFactSide(
+            RelNode rel,
+            ImmutableIntList joinKeys,
+            RelNode dimSide,
+            ImmutableIntList dimSideJoinKey,
+            DppFactSideFactors factSideFactors,
+            Join join) {
         if (rel instanceof TableScan) {
             TableScan scan = (TableScan) rel;
             if (scan instanceof BatchPhysicalDynamicFilteringTableSourceScan) {
                 // rule applied
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
             TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
             if (tableSourceTable == null) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
             CatalogTable catalogTable = 
tableSourceTable.contextResolvedTable().getResolvedTable();
             List<String> partitionKeys = catalogTable.getPartitionKeys();
             if (partitionKeys.isEmpty()) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
             DynamicTableSource tableSource = tableSourceTable.tableSource();
             if (!(tableSource instanceof SupportsDynamicFiltering)
                     || !(tableSource instanceof ScanTableSource)) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
             if (!isNewSource((ScanTableSource) tableSource)) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
 
             List<String> candidateFields =
                     joinKeys.stream()
                             .map(i -> scan.getRowType().getFieldNames().get(i))
                             .collect(Collectors.toList());
             if (candidateFields.isEmpty()) {
-                factSideFactors.isSuitableFactScanSource = false;
-                return;
+                return rel;
             }
 
-            factSideFactors.isSuitableFactScanSource =
-                    !getSuitableDynamicFilteringFieldsInFactSide(tableSource, 
candidateFields)
-                            .isEmpty();
-        } else if (rel instanceof HepRelVertex) {
-            visitFactSide(((HepRelVertex) rel).getCurrentRel(), 
factSideFactors, joinKeys);
+            List<String> acceptedFilterFields =
+                    getSuitableDynamicFilteringFieldsInFactSide(tableSource, 
candidateFields);
+
+            if (acceptedFilterFields.size() == 0) {
+                return rel;
+            }
+
+            // Apply suitable accepted filter fields to source.
+            ((SupportsDynamicFiltering) 
tableSource).applyDynamicFiltering(acceptedFilterFields);
+
+            List<Integer> acceptedFieldIndices =
+                    acceptedFilterFields.stream()
+                            .map(f -> 
scan.getRowType().getFieldNames().indexOf(f))
+                            .collect(Collectors.toList());
+            List<Integer> dynamicFilteringFieldIndices = new ArrayList<>();
+            for (int i = 0; i < joinKeys.size(); ++i) {
+                if (acceptedFieldIndices.contains(joinKeys.get(i))) {
+                    dynamicFilteringFieldIndices.add(dimSideJoinKey.get(i));
+                }
+            }
+
+            BatchPhysicalDynamicFilteringDataCollector 
dynamicFilteringDataCollector =
+                    createDynamicFilteringConnector(dimSide, 
dynamicFilteringFieldIndices);
+
+            factSideFactors.isChanged = true;
+
+            if (!isSuitableJoin(join)) {

Review Comment:
   > this validation can be moved to `join` branch
   
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to