godfreyhe commented on a change in pull request #13897: URL: https://github.com/apache/flink/pull/13897#discussion_r517849708
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processors/MultipleInputNodeCreationProcessor.java ########## @@ -283,6 +283,102 @@ private void optimizeMultipleInputGroups(List<ExecNodeWrapper> orderedWrappers) wrapper.removeFromGroup(); } } + + // wrappers are checked in topological order from sinks to sources + for (ExecNodeWrapper wrapper : orderedWrappers) { + MultipleInputGroup group = wrapper.group; + if (group == null) { + // we only consider nodes currently in a multiple input group + continue; + } + + if (isOutputOfMultipleInputGroup(wrapper) && wrapper.execNode instanceof Union) { + // optimization 5. this optimization remove redundant union at the output of a + // multiple input, consider the following graph: + // + // source -> exchange -> agg ---\ + // source -> exchange -> agg --> union -> + // source -> exchange -> join --/ + // source -> exchange --/ + // + // we'll initially put aggs, the join and the union into a multiple input, while + // the union here is actually redundant. + int numberOfUsefulInputs = 0; + List<Integer> uselessBranches = new ArrayList<>(); + List<List<ExecNodeWrapper>> sameGroupWrappersList = new ArrayList<>(); + + // an input branch is useful if it contains a node with two or more inputs other + // than union. we shall keep the union if it has two or more useful input branches, + // as this may benefit source chaining. consider the following example: + // + // chainable source -> join -\ + // / \ + // chainable source --< union + // \ / + // chainable source -> join -/ + for (int i = 0; i < wrapper.inputs.size(); i++) { + ExecNodeWrapper inputWrapper = wrapper.inputs.get(i); + List<ExecNodeWrapper> sameGroupWrappers = getInputWrappersInSameGroup(inputWrapper, wrapper.group); + sameGroupWrappersList.add(sameGroupWrappers); + long numberOfValuableNodes = sameGroupWrappers.stream() + .filter(w -> w.inputs.size() >= 2 && !(w.execNode instanceof Union)) + .count(); + if (numberOfValuableNodes > 0) { + numberOfUsefulInputs++; + } else { + uselessBranches.add(i); + } + } + + if (numberOfUsefulInputs < 2) { + // remove this union and its useless branches from multiple input + for (int branch : uselessBranches) { + List<ExecNodeWrapper> sameGroupWrappers = sameGroupWrappersList.get(branch); + for (ExecNodeWrapper w : sameGroupWrappers) { + if (w.group != null) { + w.removeFromGroup(); + } + } + } + wrapper.removeFromGroup(); + } + } + } + } + + private List<ExecNodeWrapper> getInputWrappersInSameGroup(ExecNodeWrapper wrapper, MultipleInputGroup group) { + List<ExecNodeWrapper> ret = new ArrayList<>(); + Queue<ExecNodeWrapper> queue = new LinkedList<>(); + Set<ExecNodeWrapper> visited = new HashSet<>(); + queue.add(wrapper); + visited.add(wrapper); + + while (!queue.isEmpty()) { + ExecNodeWrapper w = queue.poll(); + if (w.group != group) { + // if a wrapper is not in the required group than its inputs will + // also not be in the group, so we can just skip it + continue; + } + ret.add(w); + + for (ExecNodeWrapper inputWrapper : w.inputs) { + if (visited.contains(inputWrapper)) { + continue; + } + queue.add(inputWrapper); + visited.add(inputWrapper); + } + } + + return ret; + } + + private boolean isOutputOfMultipleInputGroup(ExecNodeWrapper wrapper) { + Preconditions.checkNotNull( Review comment: a wrapper is the output node if the wrapper is the root of its group ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org