godfreyhe commented on a change in pull request #13742: URL: https://github.com/apache/flink/pull/13742#discussion_r512062541
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java ########## @@ -88,41 +76,58 @@ * * <p>This class maintains a topological graph in which an edge pointing from vertex A to vertex B indicates * that the results from vertex A need to be read before those from vertex B. A loop in the graph indicates - * a deadlock, and we resolve such deadlock by inserting a {@link BatchExecExchange} with batch shuffle mode. + * a deadlock, and different subclasses of this class resolve the conflict in different ways. * * <p>For a detailed explanation of the algorithm, see appendix of the * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>. */ @Internal -public class InputPriorityConflictResolver { +public abstract class AbstractInputPriorityConflictResolver { private final List<ExecNode<?, ?>> roots; + private final Set<ExecNode<?, ?>> boundaries; + private final ExecEdge.DamBehavior safeDamBehavior; - private TopologyGraph graph; + protected TopologyGraph graph; - public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) { + /** + * Create an {@link AbstractInputPriorityConflictResolver} for the given {@link ExecNode} sub-graph. + * + * @param roots the first layer of nodes on the output side of the sub-graph + * @param boundaries the first layer of nodes on the input side of the sub-graph + * @param safeDamBehavior when checking for conflicts we'll ignore the edges with + * {@link ExecEdge.DamBehavior} stricter or equal than this + */ + public AbstractInputPriorityConflictResolver( + List<ExecNode<?, ?>> roots, + Set<ExecNode<?, ?>> boundaries, + ExecEdge.DamBehavior safeDamBehavior) { Preconditions.checkArgument( roots.stream().allMatch(root -> root instanceof BatchExecNode), "InputPriorityConflictResolver can only be used for batch jobs."); this.roots = roots; + this.boundaries = boundaries; + this.safeDamBehavior = safeDamBehavior; } - public void detectAndResolve() { + protected void createTopologyGraphAndResolveConflict() { Review comment: createTopologyGraph? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java ########## @@ -88,41 +76,58 @@ * * <p>This class maintains a topological graph in which an edge pointing from vertex A to vertex B indicates * that the results from vertex A need to be read before those from vertex B. A loop in the graph indicates - * a deadlock, and we resolve such deadlock by inserting a {@link BatchExecExchange} with batch shuffle mode. + * a deadlock, and different subclasses of this class resolve the conflict in different ways. * * <p>For a detailed explanation of the algorithm, see appendix of the * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>. */ @Internal -public class InputPriorityConflictResolver { +public abstract class AbstractInputPriorityConflictResolver { private final List<ExecNode<?, ?>> roots; + private final Set<ExecNode<?, ?>> boundaries; + private final ExecEdge.DamBehavior safeDamBehavior; - private TopologyGraph graph; + protected TopologyGraph graph; - public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) { + /** + * Create an {@link AbstractInputPriorityConflictResolver} for the given {@link ExecNode} sub-graph. + * + * @param roots the first layer of nodes on the output side of the sub-graph + * @param boundaries the first layer of nodes on the input side of the sub-graph + * @param safeDamBehavior when checking for conflicts we'll ignore the edges with + * {@link ExecEdge.DamBehavior} stricter or equal than this + */ + public AbstractInputPriorityConflictResolver( + List<ExecNode<?, ?>> roots, + Set<ExecNode<?, ?>> boundaries, + ExecEdge.DamBehavior safeDamBehavior) { Preconditions.checkArgument( roots.stream().allMatch(root -> root instanceof BatchExecNode), "InputPriorityConflictResolver can only be used for batch jobs."); this.roots = roots; + this.boundaries = boundaries; + this.safeDamBehavior = safeDamBehavior; } - public void detectAndResolve() { + protected void createTopologyGraphAndResolveConflict() { // build an initial topology graph - graph = new TopologyGraph(roots); + graph = new TopologyGraph(roots, boundaries); // check and resolve conflicts about input priorities AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new AbstractExecNodeExactlyOnceVisitor() { @Override protected void visitNode(ExecNode<?, ?> node) { - visitInputs(node); - checkInputPriorities(node); + if (!boundaries.contains(node)) { + visitInputs(node); + } + updateTopologyGraphAndResolveConflict(node); } }; roots.forEach(n -> n.accept(inputPriorityVisitor)); } - private void checkInputPriorities(ExecNode<?, ?> node) { + private void updateTopologyGraphAndResolveConflict(ExecNode<?, ?> node) { Review comment: updateTopologyGraph ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/TopologyGraph.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.reuse; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +/** + * A data structure storing the topological and input priority information of an {@link ExecNode} graph. + */ +@Internal +class TopologyGraph { + + private final Map<ExecNode<?, ?>, TopologyNode> nodes; + + TopologyGraph(List<ExecNode<?, ?>> roots) { + this(roots, Collections.emptySet()); + } + + TopologyGraph(List<ExecNode<?, ?>> roots, Set<ExecNode<?, ?>> boundaries) { + this.nodes = new HashMap<>(); + + // we first link all edges in the original exec node graph + AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() { + @Override + protected void visitNode(ExecNode<?, ?> node) { + if (boundaries.contains(node)) { + return; + } + for (ExecNode<?, ?> input : node.getInputNodes()) { + link(input, node); + } + visitInputs(node); + } + }; + roots.forEach(n -> n.accept(visitor)); + } + + /** + * Link an edge from `from` node to `to` node if no loop will occur after adding this edge. + * Returns if this edge is successfully added. + */ + boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) { + TopologyNode fromNode = getTopologyNode(from); + TopologyNode toNode = getTopologyNode(to); + + if (canReach(toNode, fromNode)) { + // invalid edge, as `to` is the predecessor of `from` + return false; + } else { + // link `from` and `to` + fromNode.outputs.add(toNode); + toNode.inputs.add(fromNode); + return true; + } + } + + /** + * Remove the edge from `from` node to `to` node. If there is no edge between them then do nothing. + */ + void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) { + TopologyNode fromNode = getTopologyNode(from); + TopologyNode toNode = getTopologyNode(to); + + fromNode.outputs.remove(toNode); + toNode.inputs.remove(fromNode); + } + + /** + * Calculate the maximum distance of the currently added nodes from the nodes without inputs. + * The smallest order is 0 (which are exactly the nodes without inputs) and the distances of Review comment: order -> distance ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java ########## @@ -88,41 +76,58 @@ * * <p>This class maintains a topological graph in which an edge pointing from vertex A to vertex B indicates * that the results from vertex A need to be read before those from vertex B. A loop in the graph indicates - * a deadlock, and we resolve such deadlock by inserting a {@link BatchExecExchange} with batch shuffle mode. + * a deadlock, and different subclasses of this class resolve the conflict in different ways. * * <p>For a detailed explanation of the algorithm, see appendix of the * <a href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design doc</a>. */ @Internal -public class InputPriorityConflictResolver { +public abstract class AbstractInputPriorityConflictResolver { Review comment: InputPriorityBasedTopologyGraphGenerator ? even we can simplified it as `TopologyGraphGenerator` ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecMultipleInputNode.scala ########## @@ -99,6 +100,14 @@ class BatchExecMultipleInputNode( val memoryKB = generator.getManagedMemoryWeight ExecNode.setManagedMemoryWeight(multipleInputTransform, memoryKB * 1024) + if (withSourceChaining) { + // set chaining strategy for source chaining + multipleInputTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES) Review comment: what if we always set the ChainingStrategy as `HEAD_WITH_SOURCES ` ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputOrderCalculator.java ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.reuse; Review comment: the package name is not correct ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolverWithExchange.java ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.reuse; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; + +import org.apache.calcite.rel.RelNode; + +import java.util.Collections; +import java.util.List; + +/** + * Subclass of the {@link AbstractInputPriorityConflictResolver}. + * + * <p>This class resolve conflicts by inserting a {@link BatchExecExchange} into the conflicting input. + */ +@Internal +public class InputPriorityConflictResolverWithExchange extends AbstractInputPriorityConflictResolver { Review comment: InputPriorityConflictResolver ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/AbstractInputPriorityConflictResolver.java ########## @@ -205,146 +205,5 @@ protected void visitNode(ExecNode<?, ?> node) { return ret; } - private BatchExecExchange createExchange(ExecNode<?, ?> node, int idx) { - RelNode inputRel = (RelNode) node.getInputNodes().get(idx); - - FlinkRelDistribution distribution; - ExecEdge.RequiredShuffle requiredShuffle = node.getInputEdges().get(idx).getRequiredShuffle(); - if (requiredShuffle.getType() == ExecEdge.ShuffleType.HASH) { - distribution = FlinkRelDistribution.hash(requiredShuffle.getKeys(), true); - } else if (requiredShuffle.getType() == ExecEdge.ShuffleType.BROADCAST) { - // should not occur - throw new IllegalStateException( - "Trying to resolve input priority conflict on broadcast side. This is not expected."); - } else if (requiredShuffle.getType() == ExecEdge.ShuffleType.SINGLETON) { - distribution = FlinkRelDistribution.SINGLETON(); - } else { - distribution = FlinkRelDistribution.ANY(); - } - - BatchExecExchange exchange = new BatchExecExchange( - inputRel.getCluster(), - inputRel.getTraitSet().replace(distribution), - inputRel, - distribution); - exchange.setRequiredShuffleMode(ShuffleMode.BATCH); - return exchange; - } - - /** - * A data structure storing the topological information of an {@link ExecNode} graph. - */ - @VisibleForTesting - static class TopologyGraph { - private final Map<ExecNode<?, ?>, TopologyNode> nodes; - - TopologyGraph(List<ExecNode<?, ?>> roots) { - this.nodes = new HashMap<>(); - - // we first link all edges in the original exec node graph - AbstractExecNodeExactlyOnceVisitor visitor = new AbstractExecNodeExactlyOnceVisitor() { - @Override - protected void visitNode(ExecNode<?, ?> node) { - for (ExecNode<?, ?> input : node.getInputNodes()) { - link(input, node); - } - visitInputs(node); - } - }; - roots.forEach(n -> n.accept(visitor)); - } - - /** - * Link an edge from `from` node to `to` node if no loop will occur after adding this edge. - * Returns if this edge is successfully added. - */ - boolean link(ExecNode<?, ?> from, ExecNode<?, ?> to) { - TopologyNode fromNode = getTopologyNode(from); - TopologyNode toNode = getTopologyNode(to); - - if (canReach(toNode, fromNode)) { - // invalid edge, as `to` is the predecessor of `from` - return false; - } else { - // link `from` and `to` - fromNode.outputs.add(toNode); - toNode.inputs.add(fromNode); - return true; - } - } - - /** - * Remove the edge from `from` node to `to` node. If there is no edge between them then do nothing. - */ - void unlink(ExecNode<?, ?> from, ExecNode<?, ?> to) { - TopologyNode fromNode = getTopologyNode(from); - TopologyNode toNode = getTopologyNode(to); - - fromNode.outputs.remove(toNode); - toNode.inputs.remove(fromNode); - } - - @VisibleForTesting - boolean canReach(ExecNode<?, ?> from, ExecNode<?, ?> to) { - TopologyNode fromNode = getTopologyNode(from); - TopologyNode toNode = getTopologyNode(to); - return canReach(fromNode, toNode); - } - - private boolean canReach(TopologyNode from, TopologyNode to) { - Set<TopologyNode> visited = new HashSet<>(); - visited.add(from); - Queue<TopologyNode> queue = new LinkedList<>(); - queue.offer(from); - - while (!queue.isEmpty()) { - TopologyNode node = queue.poll(); - if (to.equals(node)) { - return true; - } - - for (TopologyNode next : node.outputs) { - if (visited.contains(next)) { - continue; - } - visited.add(next); - queue.offer(next); - } - } - - return false; - } - - private TopologyNode getTopologyNode(ExecNode<?, ?> execNode) { - // NOTE: We treat different `BatchExecBoundedStreamScan`s with same `DataStream` object as the same - if (execNode instanceof BatchExecBoundedStreamScan) { - DataStream<?> currentStream = - ((BatchExecBoundedStreamScan) execNode).boundedStreamTable().dataStream(); - for (Map.Entry<ExecNode<?, ?>, TopologyNode> entry : nodes.entrySet()) { - ExecNode<?, ?> key = entry.getKey(); - if (key instanceof BatchExecBoundedStreamScan) { - DataStream<?> existingStream = - ((BatchExecBoundedStreamScan) key).boundedStreamTable().dataStream(); - if (existingStream.equals(currentStream)) { - return entry.getValue(); - } - } - } - - TopologyNode result = new TopologyNode(); - nodes.put(execNode, result); - return result; - } else { - return nodes.computeIfAbsent(execNode, k -> new TopologyNode()); - } - } - } - - /** - * A node in the {@link TopologyGraph}. - */ - private static class TopologyNode { - private final Set<TopologyNode> inputs = new HashSet<>(); - private final Set<TopologyNode> outputs = new HashSet<>(); - } + protected abstract void resolveConflict(ExecNode<?, ?> node, int conflictInput); Review comment: resolveInputPriorityConflict ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org