swuferhong commented on code in PR #21489: URL: https://github.com/apache/flink/pull/21489#discussion_r1061446831
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DynamicPartitionPruningUtils.java: ########## @@ -201,33 +224,128 @@ private static void visitFactSide( .collect(Collectors.toList()); ImmutableIntList inputJoinKeys = getInputIndices(projects, joinKeys); if (inputJoinKeys.isEmpty()) { - factSideFactors.isSuitableJoinKey = false; - return; + return rel; } - visitFactSide(rel.getInput(0), factSideFactors, inputJoinKeys); + return rel.copy( + rel.getTraitSet(), + Collections.singletonList( + convertDppFactSide( + rel.getInput(0), + inputJoinKeys, + dimSide, + dimSideJoinKey, + factSideFactors, + join))); + } else if (rel instanceof Join) { + Join currentJoin = (Join) rel; + return currentJoin.copy( + currentJoin.getTraitSet(), + Arrays.asList( + convertDppFactSide( + currentJoin.getLeft(), + getInputIndices(currentJoin, joinKeys, true), + dimSide, + dimSideJoinKey, + factSideFactors, + join), + convertDppFactSide( + currentJoin.getRight(), + getInputIndices(currentJoin, joinKeys, false), + dimSide, + dimSideJoinKey, + factSideFactors, + join))); + } else if (rel instanceof Union) { + Union union = (Union) rel; + List<RelNode> newInputs = new ArrayList<>(); + for (RelNode input : union.getInputs()) { + newInputs.add( + convertDppFactSide( + input, joinKeys, dimSide, dimSideJoinKey, factSideFactors, join)); + } + return union.copy(union.getTraitSet(), newInputs, union.all); + } else if (rel instanceof BatchPhysicalGroupAggregateBase) { + BatchPhysicalGroupAggregateBase agg = (BatchPhysicalGroupAggregateBase) rel; + List<RelNode> newInputs = new ArrayList<>(); + for (RelNode input : agg.getInputs()) { + newInputs.add( + convertDppFactSide( + input, + getInputIndices(agg, input, joinKeys), + dimSide, + dimSideJoinKey, + factSideFactors, + join)); + } + + return agg.copy(agg.getTraitSet(), newInputs); } + + return rel; } - public static List<String> getSuitableDynamicFilteringFieldsInFactSide( - DynamicTableSource tableSource, List<String> candidateFields) { - List<String> acceptedFilterFields = - ((SupportsDynamicFiltering) tableSource).listAcceptedFilterFields(); - if (acceptedFilterFields == null || acceptedFilterFields.isEmpty()) { - return new ArrayList<>(); + private static boolean isSuitableJoin(Join join) { + // Now dynamic partition pruning supports left/right join, inner and semi + // join. but now semi + // join can not join reorder. + if (join.getJoinType() != JoinRelType.INNER + && join.getJoinType() != JoinRelType.SEMI + && join.getJoinType() != JoinRelType.LEFT + && join.getJoinType() != JoinRelType.RIGHT) { + return false; } - List<String> suitableFields = new ArrayList<>(); - // If candidateField not in acceptedFilterFields means dpp rule will not be matched, - // because we can not prune any partitions according to non-accepted filter fields - // provided by partition table source. - for (String candidateField : candidateFields) { - if (acceptedFilterFields.contains(candidateField)) { - suitableFields.add(candidateField); + JoinInfo joinInfo = join.analyzeCondition(); + return !joinInfo.leftKeys.isEmpty(); + } + + private static ImmutableIntList getInputIndices( + BatchPhysicalGroupAggregateBase agg, RelNode aggInput, ImmutableIntList joinKeys) { + int[] indexMap = new int[aggInput.getRowType().getFieldCount()]; + int[] grouping = agg.grouping(); + if (grouping.length == 0) { + return joinKeys; + } + int beginIndex = grouping[0] - 1; + for (int i = 0; i < indexMap.length; i++) { + indexMap[i] = i; + } + + System.arraycopy(grouping, 0, indexMap, 0, grouping.length); + if (beginIndex >= 0) { Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org