godfreyhe commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r512431935



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processor/utils/TopologyGraph.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A data structure storing the topological and input priority information of 
an {@link ExecNode} graph.
+ */
+@Internal
+class TopologyGraph {
+
+       private final Map<ExecNode<?, ?>, TopologyNode> nodes;
+
+       TopologyGraph(List<ExecNode<?, ?>> roots) {
+               this(roots, Collections.emptySet());
+       }
+
+       TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> 
boundaries) {
+               this.nodes = new HashMap<>();
+
+               // we first link all edges in the original exec node graph
+               AbstractExecNodeExactlyOnceVisitor visitor = new 
AbstractExecNodeExactlyOnceVisitor() {
+                       @Override
+                       protected void visitNode(ExecNode<?, ?> node) {
+                               if (boundaries.contains(node)) {
+                                       return;
+                               }
+                               for (ExecNode<?, ?> input : 
node.getInputNodes()) {
+                                       link(input, node);
+                               }
+                               visitInputs(node);
+                       }
+               };
+               roots.forEach(n -> n.accept(visitor));
+       }
+
+       /**
+        * Link an edge from `from` node to `to` node if no loop will occur 
after adding this edge.
+        * Returns if this edge is successfully added.
+        */
+       boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+
+               if (canReach(toNode, fromNode)) {
+                       // invalid edge, as `to` is the predecessor of `from`
+                       return false;
+               } else {
+                       // link `from` and `to`
+                       fromNode.outputs.add(toNode);
+                       toNode.inputs.add(fromNode);
+                       return true;
+               }
+       }
+
+       /**
+        * Remove the edge from `from` node to `to` node. If there is no edge 
between them then do nothing.
+        */
+       void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+
+               fromNode.outputs.remove(toNode);
+               toNode.inputs.remove(fromNode);
+       }
+
+       /**
+        * Calculate the maximum distance of the currently added nodes from the 
nodes without inputs.
+        * The smallest distance is 0 (which are exactly the nodes without 
inputs) and the distances of
+        * other nodes are the largest distances in their inputs plus 1.
+        */
+       Map<ExecNode<?, ?>, Integer> calculateDistance() {

Review comment:
       give `distance` a definition ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processor/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.connector.source.SourceProvider;
+import 
org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import 
org.apache.flink.table.planner.plan.processor.utils.InputOrderCalculator;
+import 
org.apache.flink.table.planner.plan.processor.utils.InputPriorityConflictResolver;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Union;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input 
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a 
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI";>design
 doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+       private final boolean isStreaming;
+
+       public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+               this.isStreaming = isStreaming;
+       }
+
+       @Override
+       public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> roots, 
DAGProcessContext context) {
+               if (!isStreaming) {
+                       // As multiple input nodes use function call to deliver 
records between sub-operators,
+                       // we cannot rely on network buffers to buffer records 
not yet ready to be read,
+                       // so only BLOCKING dam behavior is safe here.
+                       // If conflict is detected under this stricter 
constraint,
+                       // we add a PIPELINED exchange to mark that its input 
and output node cannot be merged
+                       // into the same multiple input node
+                       InputPriorityConflictResolver resolver = new 
InputPriorityConflictResolver(
+                               roots,
+                               ExecEdge.DamBehavior.BLOCKING,
+                               ShuffleMode.PIPELINED);
+                       resolver.detectAndResolve();
+               }
+
+               List<ExecNodeWrapper> rootWrappers = wrapExecNodes(roots);
+               // sort all nodes in topological order, sinks come first and 
sources come last
+               List<ExecNodeWrapper> orderedWrappers = 
topologicalSort(rootWrappers);
+               // group nodes into multiple input groups
+               createMultipleInputGroups(orderedWrappers);
+               // apply optimizations to remove unnecessary nodes out of 
multiple input groups
+               optimizeMultipleInputGroups(orderedWrappers);
+
+               // create the real multiple input nodes
+               return createMultipleInputNodes(rootWrappers);
+       }
+
+       // 
--------------------------------------------------------------------------------
+       // Wrapping and Sorting
+       // 
--------------------------------------------------------------------------------
+
+       private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>> 
rootNodes) {
+               Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new 
HashMap<>();
+               AbstractExecNodeExactlyOnceVisitor visitor = new 
AbstractExecNodeExactlyOnceVisitor() {
+                       @Override
+                       protected void visitNode(ExecNode<?, ?> node) {
+                               ExecNodeWrapper wrapper = 
wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+                               for (ExecNode<?, ?> input : 
node.getInputNodes()) {
+                                       ExecNodeWrapper inputWrapper = 
wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+                                       wrapper.inputs.add(inputWrapper);
+                                       inputWrapper.outputs.add(wrapper);
+                               }
+                               visitInputs(node);
+                       }
+               };
+               rootNodes.forEach(s -> s.accept(visitor));
+
+               List<ExecNodeWrapper> rootWrappers = new ArrayList<>();
+               for (ExecNode<?, ?> root : rootNodes) {
+                       ExecNodeWrapper rootWrapper = wrapperMap.get(root);
+                       Preconditions.checkNotNull(rootWrapper, "Root node is 
not wrapped. This is a bug.");
+                       rootWrappers.add(rootWrapper);
+               }
+               return rootWrappers;
+       }
+
+       private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper> 
rootWrappers) {
+               List<ExecNodeWrapper> result = new ArrayList<>();
+               Queue<ExecNodeWrapper> queue = new LinkedList<>(rootWrappers);
+               Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+               while (!queue.isEmpty()) {
+                       ExecNodeWrapper wrapper = queue.poll();
+                       result.add(wrapper);
+                       for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+                               int visitCount = 
visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+                               if (visitCount == inputWrapper.outputs.size()) {
+                                       queue.offer(inputWrapper);
+                               }
+                       }
+               }
+
+               return result;
+       }
+
+       // 
--------------------------------------------------------------------------------
+       // Multiple Input Groups Creating
+       // 
--------------------------------------------------------------------------------
+
+       private void createMultipleInputGroups(List<ExecNodeWrapper> 
orderedWrappers) {
+               // wrappers are checked in topological order from sinks to 
sources
+               for (ExecNodeWrapper wrapper : orderedWrappers) {
+                       // we skip nodes which cannot be a member of a multiple 
input node
+                       if (!canBeMultipleInputNodeMember(wrapper)) {
+                               continue;
+                       }
+
+                       // we first try to assign this wrapper into the same 
group with its outputs
+                       MultipleInputGroup outputGroup = 
canBeInSameGroupWithOutputs(wrapper);
+                       if (outputGroup != null) {
+                               wrapper.addToGroup(outputGroup);
+                               continue;
+                       }
+
+                       // we then try to create a new multiple input group 
with this node as the root
+                       if (canBeRootOfMultipleInputGroup(wrapper)) {
+                               wrapper.createGroup();
+                       }
+
+                       // all our attempts failed, this node will not be in a 
multiple input node
+               }
+       }
+
+       private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+               if (wrapper.inputs.isEmpty()) {
+                       // sources cannot be a member of multiple input node
+                       return false;
+               }
+               if (wrapper.execNode instanceof Exchange) {
+                       // exchange cannot be a member of multiple input node
+                       return false;
+               }
+
+               return true;
+       }
+
+       /**
+        * A node can only be assigned into the same multiple input group of 
its outputs
+        * if all outputs have a group and are the same.
+        *
+        * @return the {@link MultipleInputGroup} of the outputs if all outputs 
have a
+        *         group and are the same, null otherwise
+        */
+       private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper 
wrapper) {
+               if (wrapper.outputs.isEmpty()) {
+                       return null;
+               }
+
+               MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+               if (outputGroup == null) {
+                       return null;
+               }
+
+               for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+                       if (outputWrapper.group != outputGroup) {
+                               return null;
+                       }
+               }
+
+               return outputGroup;
+       }
+
+       private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+               // only a node with more than one input can be the root,
+               // as one-input operator chaining are handled by operator chains
+               return wrapper.inputs.size() >= 2;
+       }
+
+       // 
--------------------------------------------------------------------------------
+       // Multiple Input Groups Optimizing
+       // 
--------------------------------------------------------------------------------
+
+       private void optimizeMultipleInputGroups(List<ExecNodeWrapper> 
orderedWrappers) {
+               // wrappers are checked in topological order from sources to 
sinks
+               for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+                       ExecNodeWrapper wrapper = orderedWrappers.get(i);
+                       MultipleInputGroup group = wrapper.group;
+                       if (group == null) {
+                               // we only consider nodes currently in a 
multiple input group
+                               continue;
+                       }
+
+                       boolean isUnion = wrapper.execNode instanceof Union;
+
+                       if (group.members.size() == 1) {
+                               Preconditions.checkState(
+                                       wrapper == group.root,
+                                       "The only member of a multiple input 
group is not its root. This is a bug.");
+                               // optimization 1. we clean up multiple input 
groups with only 1 member,
+                               // unless one of its input is a FLIP-27 source 
(for maximizing source chaining),
+                               // however unions do not apply to this 
optimization because they're not real operators
+                               if (isUnion || 
wrapper.inputs.stream().noneMatch(
+                                               inputWrapper -> 
isChainableSource(inputWrapper.execNode))) {
+                                       wrapper.removeFromGroup();
+                               }
+                               continue;
+                       }
+
+                       if (!isEntranceOfMultipleInputGroup(wrapper)) {
+                               // we're not removing a node from the middle of 
a multiple input group
+                               continue;
+                       }
+
+                       boolean shouldRemove = false;
+                       if (isUnion) {
+                               // optimization 2. we do not allow union to be 
the tail of a multiple input
+                               // as we're paying extra function calls for 
this, unless one of the united
+                               // input is a FLIP-27 source
+                               shouldRemove = 
wrapper.inputs.stream().noneMatch(
+                                       inputWrapper -> 
isChainableSource(inputWrapper.execNode));
+                       } else if (wrapper.inputs.size() == 1) {
+                               // optimization 3. for one-input operators 
we'll remove it unless its input
+                               // is an exchange or a FLIP-27 source, this is 
mainly to avoid the following
+                               // pattern:
+                               // non-chainable source -> calc --\
+                               //                                 join ->
+                               // non-chainable source -> calc --/
+                               // if we move two calcs into the multiple input 
group rooted at the join, we're
+                               // directly shuffling large amount of records 
from the source without filtering
+                               // by the calc
+                               ExecNode<?, ?> input = 
wrapper.inputs.get(0).execNode;
+                               shouldRemove = !(input instanceof Exchange) && 
!isChainableSource(input);
+                       }
+
+                       // optimization 4. for singleton operations (for 
example singleton global agg)
+                       // we're not including it into the multiple input node 
as we have to ensure that
+                       // the whole multiple input can only have 1 parallelism.
+                       // continuous singleton operations connected by 
forwarding shuffle will be dealt
+                       // together with optimization 3
+                       shouldRemove |= 
wrapper.inputs.stream().anyMatch(inputWrapper ->
+                               inputWrapper.execNode instanceof 
BatchExecExchange &&

Review comment:
       should also consider StreamExecExchange here? BatchExecExchange -> 
Exchange

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/processor/utils/InputPriorityConflictResolverTest.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor.utils;
+
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests for {@link InputPriorityConflictResolver}.
+ */
+public class InputPriorityConflictResolverTest {
+
+       @Test
+       public void testDetectAndResolve() {
+               // P = ExecEdge.DamBehavior.PIPELINED, E = 
ExecEdge.DamBehavior.END_INPUT
+               // P100 = PIPELINED + priority 100
+               //
+               // 0 --------(P0)----> 1 --(P0)-----------> 7
+               //  \                    \-(P0)-> 2 -(P0)--/
+               //   \-------(P0)----> 3 --(P1)-----------/
+               //    \------(P0)----> 4 --(P10)---------/
+               //     \              /                 /
+               //      \    8 -(P0)-<                 /
+               //       \            \               /
+               //        \--(E0)----> 5 --(P10)-----/
+               // 6 ---------(P100)----------------/
+               TestingBatchExecNode[] nodes = new TestingBatchExecNode[9];
+               for (int i = 0; i < nodes.length; i++) {
+                       nodes[i] = new TestingBatchExecNode();
+               }
+               nodes[1].addInput(nodes[0], 
ExecEdge.builder().priority(0).build());
+               nodes[2].addInput(nodes[1], 
ExecEdge.builder().priority(0).build());
+               nodes[3].addInput(nodes[0], 
ExecEdge.builder().priority(0).build());
+               nodes[4].addInput(nodes[8], 
ExecEdge.builder().priority(0).build());
+               nodes[4].addInput(nodes[0], 
ExecEdge.builder().priority(0).build());
+               nodes[5].addInput(nodes[8], 
ExecEdge.builder().priority(0).build());
+               nodes[5].addInput(nodes[0], 
ExecEdge.builder().damBehavior(ExecEdge.DamBehavior.END_INPUT).priority(0).build());
+               nodes[7].addInput(nodes[1], 
ExecEdge.builder().priority(0).build());
+               nodes[7].addInput(nodes[2], 
ExecEdge.builder().priority(0).build());
+               nodes[7].addInput(nodes[3], 
ExecEdge.builder().priority(1).build());
+               nodes[7].addInput(nodes[4], 
ExecEdge.builder().priority(10).build());
+               nodes[7].addInput(nodes[5], 
ExecEdge.builder().priority(10).build());
+               nodes[7].addInput(nodes[6], 
ExecEdge.builder().priority(100).build());
+
+               InputPriorityConflictResolver resolver = new 
InputPriorityConflictResolver(
+                       Collections.singletonList(nodes[7]),
+                       ExecEdge.DamBehavior.END_INPUT,
+                       ShuffleMode.BATCH);
+               resolver.detectAndResolve();
+               Assert.assertEquals(nodes[1], nodes[7].getInputNodes().get(0));
+               Assert.assertEquals(nodes[2], nodes[7].getInputNodes().get(1));
+               Assert.assertTrue(nodes[7].getInputNodes().get(2) instanceof 
BatchExecExchange);

Review comment:
       also check the shuffle mode ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecMultipleInputNode.scala
##########
@@ -27,12 +27,12 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecEdge,
 import org.apache.flink.table.planner.plan.nodes.physical.MultipleInputRel
 import 
org.apache.flink.table.runtime.operators.multipleinput.{BatchMultipleInputStreamOperatorFactory,
 TableOperatorWrapperGenerator}
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
-
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-
 import java.util
 
+import org.apache.flink.streaming.api.operators.ChainingStrategy

Review comment:
       reorder the import

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processor/utils/TopologyGraph.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A data structure storing the topological and input priority information of 
an {@link ExecNode} graph.
+ */
+@Internal
+class TopologyGraph {
+
+       private final Map<ExecNode<?, ?>, TopologyNode> nodes;
+
+       TopologyGraph(List<ExecNode<?, ?>> roots) {
+               this(roots, Collections.emptySet());
+       }
+
+       TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> 
boundaries) {
+               this.nodes = new HashMap<>();
+
+               // we first link all edges in the original exec node graph
+               AbstractExecNodeExactlyOnceVisitor visitor = new 
AbstractExecNodeExactlyOnceVisitor() {
+                       @Override
+                       protected void visitNode(ExecNode<?, ?> node) {
+                               if (boundaries.contains(node)) {
+                                       return;
+                               }
+                               for (ExecNode<?, ?> input : 
node.getInputNodes()) {
+                                       link(input, node);
+                               }
+                               visitInputs(node);
+                       }
+               };
+               roots.forEach(n -> n.accept(visitor));
+       }
+
+       /**
+        * Link an edge from `from` node to `to` node if no loop will occur 
after adding this edge.
+        * Returns if this edge is successfully added.
+        */
+       boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+
+               if (canReach(toNode, fromNode)) {
+                       // invalid edge, as `to` is the predecessor of `from`
+                       return false;
+               } else {
+                       // link `from` and `to`
+                       fromNode.outputs.add(toNode);
+                       toNode.inputs.add(fromNode);
+                       return true;
+               }
+       }
+
+       /**
+        * Remove the edge from `from` node to `to` node. If there is no edge 
between them then do nothing.
+        */
+       void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+
+               fromNode.outputs.remove(toNode);
+               toNode.inputs.remove(fromNode);
+       }
+
+       /**
+        * Calculate the maximum distance of the currently added nodes from the 
nodes without inputs.
+        * The smallest distance is 0 (which are exactly the nodes without 
inputs) and the distances of
+        * other nodes are the largest distances in their inputs plus 1.
+        */
+       Map<ExecNode<?, ?>, Integer> calculateDistance() {
+               Map<ExecNode<?, ?>, Integer> result = new HashMap<>();
+               Map<TopologyNode, Integer> inputsVisitedMap = new HashMap<>();
+
+               Queue<TopologyNode> queue = new LinkedList<>();
+               for (TopologyNode node : nodes.values()) {
+                       if (node.inputs.size() == 0) {
+                               queue.offer(node);
+                       }
+               }
+
+               while (!queue.isEmpty()) {
+                       TopologyNode node = queue.poll();
+                       int dist = -1;
+                       for (TopologyNode input : node.inputs) {
+                               dist = Math.max(
+                                               dist,
+                                       Preconditions.checkNotNull(
+                                               result.get(input.execNode),
+                                               "The distance of an input node 
is not calculated. This is a bug."));
+                       }
+                       dist++;
+                       result.put(node.execNode, dist);
+
+                       for (TopologyNode output : node.outputs) {
+                               int inputsVisited = 
inputsVisitedMap.compute(output, (k, v) -> v == null ? 1 : v + 1);
+                               if (inputsVisited == output.inputs.size()) {
+                                       queue.offer(output);
+                               }
+                       }
+               }
+
+               return result;
+       }
+
+       @VisibleForTesting
+       boolean canReach(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+               return canReach(fromNode, toNode);
+       }
+
+       private boolean canReach(TopologyNode from, TopologyNode to) {
+               Set<TopologyNode> visited = new HashSet<>();
+               visited.add(from);
+               Queue<TopologyNode> queue = new LinkedList<>();
+               queue.offer(from);
+
+               while (!queue.isEmpty()) {
+                       TopologyNode node = queue.poll();
+                       if (to.equals(node)) {
+                               return true;
+                       }
+
+                       for (TopologyNode next : node.outputs) {
+                               if (visited.contains(next)) {
+                                       continue;
+                               }
+                               visited.add(next);
+                               queue.offer(next);
+                       }
+               }
+
+               return false;
+       }
+
+       private TopologyNode getTopologyNode(ExecNode<?, ?> execNode) {

Review comment:
       getOrCreateTopologyNode ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/processor/DeadlockBreakupProcessor.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.processor;

Review comment:
       move it to `org.apache.flink.table.planner.plan.nodes.process`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to