godfreyhe commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r512062541



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java
##########
@@ -88,41 +76,58 @@
  *
  * <p>This class maintains a topological graph in which an edge pointing from 
vertex A to vertex B indicates
  * that the results from vertex A need to be read before those from vertex B. 
A loop in the graph indicates
- * a deadlock, and we resolve such deadlock by inserting a {@link 
BatchExecExchange} with batch shuffle mode.
+ * a deadlock, and different subclasses of this class resolve the conflict in 
different ways.
  *
  * <p>For a detailed explanation of the algorithm, see appendix of the
  * <a 
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI";>design
 doc</a>.
  */
 @Internal
-public class InputPriorityConflictResolver {
+public abstract class AbstractInputPriorityConflictResolver {
 
        private final List<ExecNode<?, ?>> roots;
+       private final Set<ExecNode<?, ?>> boundaries;
+       private final ExecEdge.DamBehavior safeDamBehavior;
 
-       private TopologyGraph graph;
+       protected TopologyGraph graph;
 
-       public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) {
+       /**
+        * Create an {@link AbstractInputPriorityConflictResolver} for the 
given {@link ExecNode} sub-graph.
+        *
+        * @param roots the first layer of nodes on the output side of the 
sub-graph
+        * @param boundaries the first layer of nodes on the input side of the 
sub-graph
+        * @param safeDamBehavior when checking for conflicts we'll ignore the 
edges with
+        *                        {@link ExecEdge.DamBehavior} stricter or 
equal than this
+        */
+       public AbstractInputPriorityConflictResolver(
+                       List<ExecNode<?, ?>> roots,
+                       Set<ExecNode<?, ?>> boundaries,
+                       ExecEdge.DamBehavior safeDamBehavior) {
                Preconditions.checkArgument(
                        roots.stream().allMatch(root -> root instanceof 
BatchExecNode),
                        "InputPriorityConflictResolver can only be used for 
batch jobs.");
                this.roots = roots;
+               this.boundaries = boundaries;
+               this.safeDamBehavior = safeDamBehavior;
        }
 
-       public void detectAndResolve() {
+       protected void createTopologyGraphAndResolveConflict() {

Review comment:
       createTopologyGraph?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java
##########
@@ -88,41 +76,58 @@
  *
  * <p>This class maintains a topological graph in which an edge pointing from 
vertex A to vertex B indicates
  * that the results from vertex A need to be read before those from vertex B. 
A loop in the graph indicates
- * a deadlock, and we resolve such deadlock by inserting a {@link 
BatchExecExchange} with batch shuffle mode.
+ * a deadlock, and different subclasses of this class resolve the conflict in 
different ways.
  *
  * <p>For a detailed explanation of the algorithm, see appendix of the
  * <a 
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI";>design
 doc</a>.
  */
 @Internal
-public class InputPriorityConflictResolver {
+public abstract class AbstractInputPriorityConflictResolver {
 
        private final List<ExecNode<?, ?>> roots;
+       private final Set<ExecNode<?, ?>> boundaries;
+       private final ExecEdge.DamBehavior safeDamBehavior;
 
-       private TopologyGraph graph;
+       protected TopologyGraph graph;
 
-       public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) {
+       /**
+        * Create an {@link AbstractInputPriorityConflictResolver} for the 
given {@link ExecNode} sub-graph.
+        *
+        * @param roots the first layer of nodes on the output side of the 
sub-graph
+        * @param boundaries the first layer of nodes on the input side of the 
sub-graph
+        * @param safeDamBehavior when checking for conflicts we'll ignore the 
edges with
+        *                        {@link ExecEdge.DamBehavior} stricter or 
equal than this
+        */
+       public AbstractInputPriorityConflictResolver(
+                       List<ExecNode<?, ?>> roots,
+                       Set<ExecNode<?, ?>> boundaries,
+                       ExecEdge.DamBehavior safeDamBehavior) {
                Preconditions.checkArgument(
                        roots.stream().allMatch(root -> root instanceof 
BatchExecNode),
                        "InputPriorityConflictResolver can only be used for 
batch jobs.");
                this.roots = roots;
+               this.boundaries = boundaries;
+               this.safeDamBehavior = safeDamBehavior;
        }
 
-       public void detectAndResolve() {
+       protected void createTopologyGraphAndResolveConflict() {
                // build an initial topology graph
-               graph = new TopologyGraph(roots);
+               graph = new TopologyGraph(roots, boundaries);
 
                // check and resolve conflicts about input priorities
                AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new 
AbstractExecNodeExactlyOnceVisitor() {
                        @Override
                        protected void visitNode(ExecNode<?, ?> node) {
-                               visitInputs(node);
-                               checkInputPriorities(node);
+                               if (!boundaries.contains(node)) {
+                                       visitInputs(node);
+                               }
+                               updateTopologyGraphAndResolveConflict(node);
                        }
                };
                roots.forEach(n -> n.accept(inputPriorityVisitor));
        }
 
-       private void checkInputPriorities(ExecNode<?, ?> node) {
+       private void updateTopologyGraphAndResolveConflict(ExecNode<?, ?> node) 
{

Review comment:
       updateTopologyGraph

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/TopologyGraph.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A data structure storing the topological and input priority information of 
an {@link ExecNode} graph.
+ */
+@Internal
+class TopologyGraph {
+
+       private final Map<ExecNode<?, ?>, TopologyNode> nodes;
+
+       TopologyGraph(List<ExecNode<?, ?>> roots) {
+               this(roots, Collections.emptySet());
+       }
+
+       TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> 
boundaries) {
+               this.nodes = new HashMap<>();
+
+               // we first link all edges in the original exec node graph
+               AbstractExecNodeExactlyOnceVisitor visitor = new 
AbstractExecNodeExactlyOnceVisitor() {
+                       @Override
+                       protected void visitNode(ExecNode<?, ?> node) {
+                               if (boundaries.contains(node)) {
+                                       return;
+                               }
+                               for (ExecNode<?, ?> input : 
node.getInputNodes()) {
+                                       link(input, node);
+                               }
+                               visitInputs(node);
+                       }
+               };
+               roots.forEach(n -> n.accept(visitor));
+       }
+
+       /**
+        * Link an edge from `from` node to `to` node if no loop will occur 
after adding this edge.
+        * Returns if this edge is successfully added.
+        */
+       boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+
+               if (canReach(toNode, fromNode)) {
+                       // invalid edge, as `to` is the predecessor of `from`
+                       return false;
+               } else {
+                       // link `from` and `to`
+                       fromNode.outputs.add(toNode);
+                       toNode.inputs.add(fromNode);
+                       return true;
+               }
+       }
+
+       /**
+        * Remove the edge from `from` node to `to` node. If there is no edge 
between them then do nothing.
+        */
+       void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
+               TopologyNode fromNode = getTopologyNode(from);
+               TopologyNode toNode = getTopologyNode(to);
+
+               fromNode.outputs.remove(toNode);
+               toNode.inputs.remove(fromNode);
+       }
+
+       /**
+        * Calculate the maximum distance of the currently added nodes from the 
nodes without inputs.
+        * The smallest order is 0 (which are exactly the nodes without inputs) 
and the distances of

Review comment:
       order -> distance

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java
##########
@@ -88,41 +76,58 @@
  *
  * <p>This class maintains a topological graph in which an edge pointing from 
vertex A to vertex B indicates
  * that the results from vertex A need to be read before those from vertex B. 
A loop in the graph indicates
- * a deadlock, and we resolve such deadlock by inserting a {@link 
BatchExecExchange} with batch shuffle mode.
+ * a deadlock, and different subclasses of this class resolve the conflict in 
different ways.
  *
  * <p>For a detailed explanation of the algorithm, see appendix of the
  * <a 
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI";>design
 doc</a>.
  */
 @Internal
-public class InputPriorityConflictResolver {
+public abstract class AbstractInputPriorityConflictResolver {

Review comment:
       InputPriorityBasedTopologyGraphGenerator ?  even we can simplified it as 
`TopologyGraphGenerator`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecMultipleInputNode.scala
##########
@@ -99,6 +100,14 @@ class BatchExecMultipleInputNode(
     val memoryKB = generator.getManagedMemoryWeight
     ExecNode.setManagedMemoryWeight(multipleInputTransform, memoryKB * 1024)
 
+    if (withSourceChaining) {
+      // set chaining strategy for source chaining
+      
multipleInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES)

Review comment:
       what if we always set the ChainingStrategy as `HEAD_WITH_SOURCES ` ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputOrderCalculator.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;

Review comment:
       the package name is not correct

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolverWithExchange.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+
+import org.apache.calcite.rel.RelNode;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Subclass of the {@link AbstractInputPriorityConflictResolver}.
+ *
+ * <p>This class resolve conflicts by inserting a {@link BatchExecExchange} 
into the conflicting input.
+ */
+@Internal
+public class InputPriorityConflictResolverWithExchange extends 
AbstractInputPriorityConflictResolver {

Review comment:
       InputPriorityConflictResolver

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java
##########
@@ -205,146 +205,5 @@ protected void visitNode(ExecNode<?, ?> node) {
                return ret;
        }
 
-       private BatchExecExchange createExchange(ExecNode<?, ?> node, int idx) {
-               RelNode inputRel = (RelNode) node.getInputNodes().get(idx);
-
-               FlinkRelDistribution distribution;
-               ExecEdge.RequiredShuffle requiredShuffle = 
node.getInputEdges().get(idx).getRequiredShuffle();
-               if (requiredShuffle.getType() == ExecEdge.ShuffleType.HASH) {
-                       distribution = 
FlinkRelDistribution.hash(requiredShuffle.getKeys(), true);
-               } else if (requiredShuffle.getType() == 
ExecEdge.ShuffleType.BROADCAST) {
-                       // should not occur
-                       throw new IllegalStateException(
-                               "Trying to resolve input priority conflict on 
broadcast side. This is not expected.");
-               } else if (requiredShuffle.getType() == 
ExecEdge.ShuffleType.SINGLETON) {
-                       distribution = FlinkRelDistribution.SINGLETON();
-               } else {
-                       distribution = FlinkRelDistribution.ANY();
-               }
-
-               BatchExecExchange exchange = new BatchExecExchange(
-                       inputRel.getCluster(),
-                       inputRel.getTraitSet().replace(distribution),
-                       inputRel,
-                       distribution);
-               exchange.setRequiredShuffleMode(ShuffleMode.BATCH);
-               return exchange;
-       }
-
-       /**
-        * A data structure storing the topological information of an {@link 
ExecNode} graph.
-        */
-       @VisibleForTesting
-       static class TopologyGraph {
-               private final Map<ExecNode<?, ?>, TopologyNode> nodes;
-
-               TopologyGraph(List<ExecNode<?, ?>> roots) {
-                       this.nodes = new HashMap<>();
-
-                       // we first link all edges in the original exec node 
graph
-                       AbstractExecNodeExactlyOnceVisitor visitor = new 
AbstractExecNodeExactlyOnceVisitor() {
-                               @Override
-                               protected void visitNode(ExecNode<?, ?> node) {
-                                       for (ExecNode<?, ?> input : 
node.getInputNodes()) {
-                                               link(input, node);
-                                       }
-                                       visitInputs(node);
-                               }
-                       };
-                       roots.forEach(n -> n.accept(visitor));
-               }
-
-               /**
-                * Link an edge from `from` node to `to` node if no loop will 
occur after adding this edge.
-                * Returns if this edge is successfully added.
-                */
-               boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) {
-                       TopologyNode fromNode = getTopologyNode(from);
-                       TopologyNode toNode = getTopologyNode(to);
-
-                       if (canReach(toNode, fromNode)) {
-                               // invalid edge, as `to` is the predecessor of 
`from`
-                               return false;
-                       } else {
-                               // link `from` and `to`
-                               fromNode.outputs.add(toNode);
-                               toNode.inputs.add(fromNode);
-                               return true;
-                       }
-               }
-
-               /**
-                * Remove the edge from `from` node to `to` node. If there is 
no edge between them then do nothing.
-                */
-               void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) {
-                       TopologyNode fromNode = getTopologyNode(from);
-                       TopologyNode toNode = getTopologyNode(to);
-
-                       fromNode.outputs.remove(toNode);
-                       toNode.inputs.remove(fromNode);
-               }
-
-               @VisibleForTesting
-               boolean canReach(ExecNode<?, ?> from, ExecNode<?, ?> to) {
-                       TopologyNode fromNode = getTopologyNode(from);
-                       TopologyNode toNode = getTopologyNode(to);
-                       return canReach(fromNode, toNode);
-               }
-
-               private boolean canReach(TopologyNode from, TopologyNode to) {
-                       Set<TopologyNode> visited = new HashSet<>();
-                       visited.add(from);
-                       Queue<TopologyNode> queue = new LinkedList<>();
-                       queue.offer(from);
-
-                       while (!queue.isEmpty()) {
-                               TopologyNode node = queue.poll();
-                               if (to.equals(node)) {
-                                       return true;
-                               }
-
-                               for (TopologyNode next : node.outputs) {
-                                       if (visited.contains(next)) {
-                                               continue;
-                                       }
-                                       visited.add(next);
-                                       queue.offer(next);
-                               }
-                       }
-
-                       return false;
-               }
-
-               private TopologyNode getTopologyNode(ExecNode<?, ?> execNode) {
-                       // NOTE: We treat different 
`BatchExecBoundedStreamScan`s with same `DataStream` object as the same
-                       if (execNode instanceof BatchExecBoundedStreamScan) {
-                               DataStream<?> currentStream =
-                                       ((BatchExecBoundedStreamScan) 
execNode).boundedStreamTable().dataStream();
-                               for (Map.Entry<ExecNode<?, ?>, TopologyNode> 
entry : nodes.entrySet()) {
-                                       ExecNode<?, ?> key = entry.getKey();
-                                       if (key instanceof 
BatchExecBoundedStreamScan) {
-                                               DataStream<?> existingStream =
-                                                       
((BatchExecBoundedStreamScan) key).boundedStreamTable().dataStream();
-                                               if 
(existingStream.equals(currentStream)) {
-                                                       return entry.getValue();
-                                               }
-                                       }
-                               }
-
-                               TopologyNode result = new TopologyNode();
-                               nodes.put(execNode, result);
-                               return result;
-                       } else {
-                               return nodes.computeIfAbsent(execNode, k -> new 
TopologyNode());
-                       }
-               }
-       }
-
-       /**
-        * A node in the {@link TopologyGraph}.
-        */
-       private static class TopologyNode {
-               private final Set<TopologyNode> inputs = new HashSet<>();
-               private final Set<TopologyNode> outputs = new HashSet<>();
-       }
+       protected abstract void resolveConflict(ExecNode<?, ?> node, int 
conflictInput);

Review comment:
       resolveInputPriorityConflict




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to