File path: 
@@ -0,0 +1,206 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.processor.utils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.util.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+ * A data structure storing the topological and input priority information of 
an {@link ExecNode} graph.
+ */
+class TopologyGraph {
+       private final Map<ExecNode<?, ?>, TopologyNode> nodes;
+       TopologyGraph(List<ExecNode<?, ?>> roots) {
+               this(roots, Collections.emptySet());
+       }
+       TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> 
boundaries) {
+               this.nodes = new HashMap<>();
+               // we first link all edges in the original exec node graph
+               AbstractExecNodeExactlyOnceVisitor visitor = new 
AbstractExecNodeExactlyOnceVisitor() {
+                       @Override
+                       protected void visitNode(ExecNode<?, ?> node) {
+                               if (boundaries.contains(node)) {
+                                       return;
+                               }
+                               for (ExecNode<?, ?> input : 
node.getInputNodes()) {
+                                       link(input, node);
+                               }
+                               visitInputs(node);
+                       }
+               };
+               roots.forEach(n -> n.accept(visitor));
+       }
+       /**
+        * Link an edge from `from` node to `to` node if no loop will occur 
after adding this edge.
+        * Returns if this edge is successfully added.
+        */
+       boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+               if (canReach(toNode, fromNode)) {
+                       // invalid edge, as `to` is the predecessor of `from`
+                       return false;
+               } else {
+                       // link `from` and `to`
+                       fromNode.outputs.add(toNode);
+                       toNode.inputs.add(fromNode);
+                       return true;
+               }
+       }
+       /**
+        * Remove the edge from `from` node to `to` node. If there is no edge 
between them then do nothing.
+        */
+       void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+               fromNode.outputs.remove(toNode);
+               toNode.inputs.remove(fromNode);
+       }
+       /**
+        * Calculate the maximum distance of the currently added nodes from the 
nodes without inputs.
+        * The smallest distance is 0 (which are exactly the nodes without 
inputs) and the distances of
+        * other nodes are the largest distances in their inputs plus 1.
+        */
+       Map<ExecNode<?, ?>, Integer> calculateDistance() {

Review comment:
       give `distance` a definition ?

File path: 
@@ -0,0 +1,484 @@
+package org.apache.flink.table.planner.plan.processor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Union;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input 
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a 
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+       private final boolean isStreaming;
+       public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+               this.isStreaming = isStreaming;
+       }
+       @Override
+       public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> roots, 
DAGProcessContext context) {
+               if (!isStreaming) {
+                       // As multiple input nodes use function call to deliver 
records between sub-operators,
+                       // we cannot rely on network buffers to buffer records 
not yet ready to be read,
+                       // so only BLOCKING dam behavior is safe here.
+                       // If conflict is detected under this stricter 
+                       // we add a PIPELINED exchange to mark that its input 
and output node cannot be merged
+                       // into the same multiple input node
+                       InputPriorityConflictResolver resolver = new 
+                               roots,
+                               ExecEdge.DamBehavior.BLOCKING,
+                               ShuffleMode.PIPELINED);
+                       resolver.detectAndResolve();
+               }
+               List<ExecNodeWrapper> rootWrappers = wrapExecNodes(roots);
+               // sort all nodes in topological order, sinks come first and 
sources come last
+               List<ExecNodeWrapper> orderedWrappers = 
+               // group nodes into multiple input groups
+               createMultipleInputGroups(orderedWrappers);
+               // apply optimizations to remove unnecessary nodes out of 
multiple input groups
+               optimizeMultipleInputGroups(orderedWrappers);
+               // create the real multiple input nodes
+               return createMultipleInputNodes(rootWrappers);
+       }
+       // 
+       // Wrapping and Sorting
+       // 
+       private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> 
rootNodes) {
+               Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new 
+               AbstractExecNodeExactlyOnceVisitor visitor = new 
AbstractExecNodeExactlyOnceVisitor() {
+                       @Override
+                       protected void visitNode(ExecNode<?, ?> node) {
+                               ExecNodeWrapper wrapper = 
wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+                               for (ExecNode<?, ?> input : 
node.getInputNodes()) {
+                                       ExecNodeWrapper inputWrapper = 
wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+                                       wrapper.inputs.add(inputWrapper);
+                                       inputWrapper.outputs.add(wrapper);
+                               }
+                               visitInputs(node);
+                       }
+               };
+               rootNodes.forEach(s -> s.accept(visitor));
+               List<ExecNodeWrapper> rootWrappers = new ArrayList<>();
+               for (ExecNode<?, ?> root : rootNodes) {
+                       ExecNodeWrapper rootWrapper = wrapperMap.get(root);
+                       Preconditions.checkNotNull(rootWrapper, "Root node is 
not wrapped. This is a bug.");
+                       rootWrappers.add(rootWrapper);
+               }
+               return rootWrappers;
+       }
+       private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> 
rootWrappers) {
+               List<ExecNodeWrapper> result = new ArrayList<>();
+               Queue<ExecNodeWrapper> queue = new LinkedList<>(rootWrappers);
+               Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+               while (!queue.isEmpty()) {
+                       ExecNodeWrapper wrapper = queue.poll();
+                       result.add(wrapper);
+                       for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+                               int visitCount = 
visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+                               if (visitCount == inputWrapper.outputs.size()) {
+                                       queue.offer(inputWrapper);
+                               }
+                       }
+               }
+               return result;
+       }
+       // 
+       // Multiple Input Groups Creating
+       // 
+       private void createMultipleInputGroups(List<ExecNodeWrapper> 
orderedWrappers) {
+               // wrappers are checked in topological order from sinks to 
+               for (ExecNodeWrapper wrapper : orderedWrappers) {
+                       // we skip nodes which cannot be a member of a multiple 
input node
+                       if (!canBeMultipleInputNodeMember(wrapper)) {
+                               continue;
+                       }
+                       // we first try to assign this wrapper into the same 
group with its outputs
+                       MultipleInputGroup outputGroup = 
+                       if (outputGroup != null) {
+                               wrapper.addToGroup(outputGroup);
+                               continue;
+                       }
+                       // we then try to create a new multiple input group 
with this node as the root
+                       if (canBeRootOfMultipleInputGroup(wrapper)) {
+                               wrapper.createGroup();
+                       }
+                       // all our attempts failed, this node will not be in a 
multiple input node
+               }
+       }
+       private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+               if (wrapper.inputs.isEmpty()) {
+                       // sources cannot be a member of multiple input node
+                       return false;
+               }
+               if (wrapper.execNode instanceof Exchange) {
+                       // exchange cannot be a member of multiple input node
+                       return false;
+               }
+               return true;
+       }
+       /**
+        * A node can only be assigned into the same multiple input group of 
its outputs
+        * if all outputs have a group and are the same.
+        *
+        * @return the {@link MultipleInputGroup} of the outputs if all outputs 
have a
+        *         group and are the same, null otherwise
+        */
+       private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper 
wrapper) {
+               if (wrapper.outputs.isEmpty()) {
+                       return null;
+               }
+               MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+               if (outputGroup == null) {
+                       return null;
+               }
+               for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+                       if (outputWrapper.group != outputGroup) {
+                               return null;
+                       }
+               }
+               return outputGroup;
+       }
+       private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+               // only a node with more than one input can be the root,
+               // as one-input operator chaining are handled by operator chains
+               return wrapper.inputs.size() >= 2;
+       }
+       // 
+       // Multiple Input Groups Optimizing
+       // 
+       private void optimizeMultipleInputGroups(List<ExecNodeWrapper> 
orderedWrappers) {
+               // wrappers are checked in topological order from sources to 
+               for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+                       ExecNodeWrapper wrapper = orderedWrappers.get(i);
+                       MultipleInputGroup group = wrapper.group;
+                       if (group == null) {
+                               // we only consider nodes currently in a 
multiple input group
+                               continue;
+                       }
+                       boolean isUnion = wrapper.execNode instanceof Union;
+                       if (group.members.size() == 1) {
+                               Preconditions.checkState(
+                                       wrapper == group.root,
+                                       "The only member of a multiple input 
group is not its root. This is a bug.");
+                               // optimization 1. we clean up multiple input 
groups with only 1 member,
+                               // unless one of its input is a FLIP-27 source 
(for maximizing source chaining),
+                               // however unions do not apply to this 
optimization because they're not real operators
+                               if (isUnion || 
+                                               inputWrapper -> 
isChainableSource(inputWrapper.execNode))) {
+                                       wrapper.removeFromGroup();
+                               }
+                               continue;
+                       }
+                       if (!isEntranceOfMultipleInputGroup(wrapper)) {
+                               // we're not removing a node from the middle of 
a multiple input group
+                               continue;
+                       }
+                       boolean shouldRemove = false;
+                       if (isUnion) {
+                               // optimization 2. we do not allow union to be 
the tail of a multiple input
+                               // as we're paying extra function calls for 
this, unless one of the united
+                               // input is a FLIP-27 source
+                               shouldRemove = 
+                                       inputWrapper -> 
+                       } else if (wrapper.inputs.size() == 1) {
+                               // optimization 3. for one-input operators 
we'll remove it unless its input
+                               // is an exchange or a FLIP-27 source, this is 
mainly to avoid the following
+                               // pattern:
+                               // non-chainable source -> calc --\
+                               //                                 join ->
+                               // non-chainable source -> calc --/
+                               // if we move two calcs into the multiple input 
group rooted at the join, we're
+                               // directly shuffling large amount of records 
from the source without filtering
+                               // by the calc
+                               ExecNode<?, ?> input = 
+                               shouldRemove = !(input instanceof Exchange) && 
+                       }
+                       // optimization 4. for singleton operations (for 
example singleton global agg)
+                       // we're not including it into the multiple input node 
as we have to ensure that
+                       // the whole multiple input can only have 1 parallelism.
+                       // continuous singleton operations connected by 
forwarding shuffle will be dealt
+                       // together with optimization 3
+                       shouldRemove |= 
wrapper.inputs.stream().anyMatch(inputWrapper ->
+                               inputWrapper.execNode instanceof 
BatchExecExchange &&

Review comment:
       should also consider StreamExecExchange here? BatchExecExchange -> 

File path: 
@@ -0,0 +1,82 @@
+package org.apache.flink.table.planner.plan.processor.utils;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.Collections;
+ * Tests for {@link InputPriorityConflictResolver}.
+ */
+public class InputPriorityConflictResolverTest {
+       @Test
+       public void testDetectAndResolve() {
+               // P = ExecEdge.DamBehavior.PIPELINED, E = 
+               // P100 = PIPELINED + priority 100
+               //
+               // 0 --------(P0)----> 1 --(P0)-----------> 7
+               //  \                    \-(P0)-> 2 -(P0)--/
+               //   \-------(P0)----> 3 --(P1)-----------/
+               //    \------(P0)----> 4 --(P10)---------/
+               //     \              /                 /
+               //      \    8 -(P0)-<                 /
+               //       \            \               /
+               //        \--(E0)----> 5 --(P10)-----/
+               // 6 ---------(P100)----------------/
+               TestingBatchExecNode[] nodes = new TestingBatchExecNode[9];
+               for (int i = 0; i < nodes.length; i++) {
+                       nodes[i] = new TestingBatchExecNode();
+               }
+               nodes[1].addInput(nodes[0], 
+               nodes[2].addInput(nodes[1], 
+               nodes[3].addInput(nodes[0], 
+               nodes[4].addInput(nodes[8], 
+               nodes[4].addInput(nodes[0], 
+               nodes[5].addInput(nodes[8], 
+               nodes[5].addInput(nodes[0], 
+               nodes[7].addInput(nodes[1], 
+               nodes[7].addInput(nodes[2], 
+               nodes[7].addInput(nodes[3], 
+               nodes[7].addInput(nodes[4], 
+               nodes[7].addInput(nodes[5], 
+               nodes[7].addInput(nodes[6], 
+               InputPriorityConflictResolver resolver = new 
+                       Collections.singletonList(nodes[7]),
+                       ExecEdge.DamBehavior.END_INPUT,
+                       ShuffleMode.BATCH);
+               resolver.detectAndResolve();
+               Assert.assertEquals(nodes[1], nodes[7].getInputNodes().get(0));
+               Assert.assertEquals(nodes[2], nodes[7].getInputNodes().get(1));
+               Assert.assertTrue(nodes[7].getInputNodes().get(2) instanceof 

Review comment:
       also check the shuffle mode ?

File path: 
@@ -27,12 +27,12 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecEdge,
 import org.apache.flink.table.planner.plan.nodes.physical.MultipleInputRel
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import java.util
+import org.apache.flink.streaming.api.operators.ChainingStrategy

Review comment:
       reorder the import

File path: 
@@ -0,0 +1,206 @@
+package org.apache.flink.table.planner.plan.processor.utils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.util.Preconditions;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+ * A data structure storing the topological and input priority information of 
an {@link ExecNode} graph.
+ */
+class TopologyGraph {
+       private final Map<ExecNode<?, ?>, TopologyNode> nodes;
+       TopologyGraph(List<ExecNode<?, ?>> roots) {
+               this(roots, Collections.emptySet());
+       }
+       TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> 
boundaries) {
+               this.nodes = new HashMap<>();
+               // we first link all edges in the original exec node graph
+               AbstractExecNodeExactlyOnceVisitor visitor = new 
AbstractExecNodeExactlyOnceVisitor() {
+                       @Override
+                       protected void visitNode(ExecNode<?, ?> node) {
+                               if (boundaries.contains(node)) {
+                                       return;
+                               }
+                               for (ExecNode<?, ?> input : 
node.getInputNodes()) {
+                                       link(input, node);
+                               }
+                               visitInputs(node);
+                       }
+               };
+               roots.forEach(n -> n.accept(visitor));
+       }
+       /**
+        * Link an edge from `from` node to `to` node if no loop will occur 
after adding this edge.
+        * Returns if this edge is successfully added.
+        */
+       boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+               if (canReach(toNode, fromNode)) {
+                       // invalid edge, as `to` is the predecessor of `from`
+                       return false;
+               } else {
+                       // link `from` and `to`
+                       fromNode.outputs.add(toNode);
+                       toNode.inputs.add(fromNode);
+                       return true;
+               }
+       }
+       /**
+        * Remove the edge from `from` node to `to` node. If there is no edge 
between them then do nothing.
+        */
+       void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+               fromNode.outputs.remove(toNode);
+               toNode.inputs.remove(fromNode);
+       }
+       /**
+        * Calculate the maximum distance of the currently added nodes from the 
nodes without inputs.
+        * The smallest distance is 0 (which are exactly the nodes without 
inputs) and the distances of
+        * other nodes are the largest distances in their inputs plus 1.
+        */
+       Map<ExecNode<?, ?>, Integer> calculateDistance() {
+               Map<ExecNode<?, ?>, Integer> result = new HashMap<>();
+               Map<TopologyNode, Integer> inputsVisitedMap = new HashMap<>();
+               Queue<TopologyNode> queue = new LinkedList<>();
+               for (TopologyNode node : nodes.values()) {
+                       if (node.inputs.size() == 0) {
+                               queue.offer(node);
+                       }
+               }
+               while (!queue.isEmpty()) {
+                       TopologyNode node = queue.poll();
+                       int dist = -1;
+                       for (TopologyNode input : node.inputs) {
+                               dist = Math.max(
+                                               dist,
+                                       Preconditions.checkNotNull(
+                                               result.get(input.execNode),
+                                               "The distance of an input node 
is not calculated. This is a bug."));
+                       }
+                       dist++;
+                       result.put(node.execNode, dist);
+                       for (TopologyNode output : node.outputs) {
+                               int inputsVisited = 
inputsVisitedMap.compute(output, (k, v) -> v == null ? 1 : v + 1);
+                               if (inputsVisited == output.inputs.size()) {
+                                       queue.offer(output);
+                               }
+                       }
+               }
+               return result;
+       }
+       @VisibleForTesting
+       boolean canReach(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+               return canReach(fromNode, toNode);
+       }
+       private boolean canReach(TopologyNode from, TopologyNode to) {
+               Set<TopologyNode> visited = new HashSet<>();
+               visited.add(from);
+               Queue<TopologyNode> queue = new LinkedList<>();
+               queue.offer(from);
+               while (!queue.isEmpty()) {
+                       TopologyNode node = queue.poll();
+                       if (to.equals(node)) {
+                               return true;
+                       }
+                       for (TopologyNode next : node.outputs) {
+                               if (visited.contains(next)) {
+                                       continue;
+                               }
+                               visited.add(next);
+                               queue.offer(next);
+                       }
+               }
+               return false;
+       }
+       private TopologyNode getTopologyNode(ExecNode<?, ?> execNode) {

Review comment:
       getOrCreateTopologyNode ?

File path: 
@@ -0,0 +1,52 @@
+package org.apache.flink.table.planner.plan.processor;

Review comment:
       move it to `org.apache.flink.table.planner.plan.nodes.process`

