noorall commented on code in PR #25414: URL: https://github.com/apache/flink/pull/25414#discussion_r1824044929
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroupComputeUtil.java: ########## @@ -101,6 +105,96 @@ public static Map<JobVertexID, ForwardGroup> computeForwardGroups( return ret; } + @VisibleForTesting + public static Map<Integer, StreamNodeForwardGroup> + computeStreamNodeForwardGroupAndCheckParallelism( + final Map<StreamNode, List<StreamNode>> + topologicallySortedChainedStreamNodesMap, + final Function<StreamNode, Set<StreamNode>> forwardProducersRetriever) { + final Map<Integer, StreamNodeForwardGroup> forwardGroupsByStartNodeId = + computeStreamNodeForwardGroup( + topologicallySortedChainedStreamNodesMap, forwardProducersRetriever); + topologicallySortedChainedStreamNodesMap + .keySet() + .forEach( + startNode -> { + StreamNodeForwardGroup forwardGroup = + forwardGroupsByStartNodeId.get(startNode.getId()); + if (forwardGroup != null && forwardGroup.isParallelismDecided()) { + checkState( + startNode.getParallelism() + == forwardGroup.getParallelism()); + } + }); + return forwardGroupsByStartNodeId; + } + + public static Map<Integer, StreamNodeForwardGroup> computeStreamNodeForwardGroup( + final Map<StreamNode, List<StreamNode>> topologicallySortedChainedStreamNodesMap, + final Function<StreamNode, Set<StreamNode>> forwardProducersRetriever) { + final Map<StreamNode, Set<StreamNode>> nodeToGroup = new IdentityHashMap<>(); + for (StreamNode currentNode : topologicallySortedChainedStreamNodesMap.keySet()) { + Set<StreamNode> currentGroup = new HashSet<>(); + currentGroup.add(currentNode); + nodeToGroup.put(currentNode, currentGroup); + for (StreamNode producerNode : forwardProducersRetriever.apply(currentNode)) { + final Set<StreamNode> producerGroup = nodeToGroup.get(producerNode); + if (producerGroup == null) { + throw new IllegalStateException( + "Producer task " + + producerNode.getId() + + " forward group is null" + + " while calculating forward group for the consumer task " + + currentNode.getId() + + ". This should be a forward group building bug."); + } + if (currentGroup != producerGroup) { + currentGroup = + VertexGroupComputeUtil.mergeVertexGroups( + currentGroup, producerGroup, nodeToGroup); + } + } + } + final Map<Integer, StreamNodeForwardGroup> result = new HashMap<>(); + for (Set<StreamNode> nodeGroup : VertexGroupComputeUtil.uniqueVertexGroups(nodeToGroup)) { + if (!nodeGroup.isEmpty()) { + StreamNodeForwardGroup streamNodeForwardGroup = + new StreamNodeForwardGroup( + nodeGroup, topologicallySortedChainedStreamNodesMap); + for (StreamNode startNode : streamNodeForwardGroup.getStartNodes()) { + result.put(startNode.getId(), streamNodeForwardGroup); + } + } + } + return result; + } + + public static boolean canTargetMergeIntoSourceForwardGroup( + ForwardGroup sourceForwardGroup, ForwardGroup targetForwardGroup) { + if (sourceForwardGroup == null || targetForwardGroup == null) { + return false; + } + + if (sourceForwardGroup == targetForwardGroup) { + return true; + } + + if (sourceForwardGroup.isParallelismDecided() + && targetForwardGroup.isParallelismDecided() + && sourceForwardGroup.getParallelism() != targetForwardGroup.getParallelism()) { + return false; + } + + if (sourceForwardGroup.isMaxParallelismDecided() + && targetForwardGroup.isMaxParallelismDecided() + && sourceForwardGroup.getMaxParallelism() + > targetForwardGroup.getMaxParallelism()) { Review Comment: > Why use '>'? Could you add a comment to explain this? If the max parallelism of the source is larger than that of the target, and the decided parallelism is equal to the max parallelism, the target can never achieve the same parallelism as the source, which may make the forward edge not working properly. Therefore, we need use '>'. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org