swuferhong commented on code in PR #21489:
URL: https://github.com/apache/flink/pull/21489#discussion_r1061446831


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java:
##########
@@ -201,33 +224,128 @@ private static void visitFactSide(
                             .collect(Collectors.toList());
             ImmutableIntList inputJoinKeys = getInputIndices(projects, 
joinKeys);
             if (inputJoinKeys.isEmpty()) {
-                factSideFactors.isSuitableJoinKey = false;
-                return;
+                return rel;
             }
 
-            visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys);
+            return rel.copy(
+                    rel.getTraitSet(),
+                    Collections.singletonList(
+                            convertDppFactSide(
+                                    rel.getInput(0),
+                                    inputJoinKeys,
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join)));
+        } else if (rel instanceof Join) {
+            Join currentJoin = (Join) rel;
+            return currentJoin.copy(
+                    currentJoin.getTraitSet(),
+                    Arrays.asList(
+                            convertDppFactSide(
+                                    currentJoin.getLeft(),
+                                    getInputIndices(currentJoin, joinKeys, 
true),
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join),
+                            convertDppFactSide(
+                                    currentJoin.getRight(),
+                                    getInputIndices(currentJoin, joinKeys, 
false),
+                                    dimSide,
+                                    dimSideJoinKey,
+                                    factSideFactors,
+                                    join)));
+        } else if (rel instanceof Union) {
+            Union union = (Union) rel;
+            List<RelNode> newInputs = new ArrayList<>();
+            for (RelNode input : union.getInputs()) {
+                newInputs.add(
+                        convertDppFactSide(
+                                input, joinKeys, dimSide, dimSideJoinKey, 
factSideFactors, join));
+            }
+            return union.copy(union.getTraitSet(), newInputs, union.all);
+        } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+            BatchPhysicalGroupAggregateBase agg = 
(BatchPhysicalGroupAggregateBase) rel;
+            List<RelNode> newInputs = new ArrayList<>();
+            for (RelNode input : agg.getInputs()) {
+                newInputs.add(
+                        convertDppFactSide(
+                                input,
+                                getInputIndices(agg, input, joinKeys),
+                                dimSide,
+                                dimSideJoinKey,
+                                factSideFactors,
+                                join));
+            }
+
+            return agg.copy(agg.getTraitSet(), newInputs);
         }
+
+        return rel;
     }
 
-    public static List<String> getSuitableDynamicFilteringFieldsInFactSide(
-            DynamicTableSource tableSource, List<String> candidateFields) {
-        List<String> acceptedFilterFields =
-                ((SupportsDynamicFiltering) 
tableSource).listAcceptedFilterFields();
-        if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) {
-            return new ArrayList<>();
+    private static boolean isSuitableJoin(Join join) {
+        // Now dynamic partition pruning supports left/right join, inner and 
semi
+        // join. but now semi
+        // join can not join reorder.
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.SEMI
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
+            return false;
         }
 
-        List<String> suitableFields = new ArrayList<>();
-        // If candidateField not in acceptedFilterFields means dpp rule will 
not be matched,
-        // because we can not prune any partitions according to non-accepted 
filter fields
-        // provided by partition table source.
-        for (String candidateField : candidateFields) {
-            if (acceptedFilterFields.contains(candidateField)) {
-                suitableFields.add(candidateField);
+        JoinInfo joinInfo = join.analyzeCondition();
+        return !joinInfo.leftKeys.isEmpty();
+    }
+
+    private static ImmutableIntList getInputIndices(
+            BatchPhysicalGroupAggregateBase agg, RelNode aggInput, 
ImmutableIntList joinKeys) {
+        int[] indexMap = new int[aggInput.getRowType().getFieldCount()];
+        int[] grouping = agg.grouping();
+        if (grouping.length == 0) {
+            return joinKeys;
+        }
+        int beginIndex = grouping[0] - 1;
+        for (int i = 0; i < indexMap.length; i++) {
+            indexMap[i] = i;
+        }
+
+        System.arraycopy(grouping, 0, indexMap, 0, grouping.length);
+        if (beginIndex >= 0) {

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to