noorall commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1824044929


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/forwardgroup/ForwardGroupComputeUtil.java:
##########
@@ -101,6 +105,96 @@ public static Map<JobVertexID, ForwardGroup> 
computeForwardGroups(
         return ret;
     }
 
+    @VisibleForTesting
+    public static Map<Integer, StreamNodeForwardGroup>
+            computeStreamNodeForwardGroupAndCheckParallelism(
+                    final Map<StreamNode, List<StreamNode>>
+                            topologicallySortedChainedStreamNodesMap,
+                    final Function<StreamNode, Set<StreamNode>> 
forwardProducersRetriever) {
+        final Map<Integer, StreamNodeForwardGroup> forwardGroupsByStartNodeId =
+                computeStreamNodeForwardGroup(
+                        topologicallySortedChainedStreamNodesMap, 
forwardProducersRetriever);
+        topologicallySortedChainedStreamNodesMap
+                .keySet()
+                .forEach(
+                        startNode -> {
+                            StreamNodeForwardGroup forwardGroup =
+                                    
forwardGroupsByStartNodeId.get(startNode.getId());
+                            if (forwardGroup != null && 
forwardGroup.isParallelismDecided()) {
+                                checkState(
+                                        startNode.getParallelism()
+                                                == 
forwardGroup.getParallelism());
+                            }
+                        });
+        return forwardGroupsByStartNodeId;
+    }
+
+    public static Map<Integer, StreamNodeForwardGroup> 
computeStreamNodeForwardGroup(
+            final Map<StreamNode, List<StreamNode>> 
topologicallySortedChainedStreamNodesMap,
+            final Function<StreamNode, Set<StreamNode>> 
forwardProducersRetriever) {
+        final Map<StreamNode, Set<StreamNode>> nodeToGroup = new 
IdentityHashMap<>();
+        for (StreamNode currentNode : 
topologicallySortedChainedStreamNodesMap.keySet()) {
+            Set<StreamNode> currentGroup = new HashSet<>();
+            currentGroup.add(currentNode);
+            nodeToGroup.put(currentNode, currentGroup);
+            for (StreamNode producerNode : 
forwardProducersRetriever.apply(currentNode)) {
+                final Set<StreamNode> producerGroup = 
nodeToGroup.get(producerNode);
+                if (producerGroup == null) {
+                    throw new IllegalStateException(
+                            "Producer task "
+                                    + producerNode.getId()
+                                    + " forward group is null"
+                                    + " while calculating forward group for 
the consumer task "
+                                    + currentNode.getId()
+                                    + ". This should be a forward group 
building bug.");
+                }
+                if (currentGroup != producerGroup) {
+                    currentGroup =
+                            VertexGroupComputeUtil.mergeVertexGroups(
+                                    currentGroup, producerGroup, nodeToGroup);
+                }
+            }
+        }
+        final Map<Integer, StreamNodeForwardGroup> result = new HashMap<>();
+        for (Set<StreamNode> nodeGroup : 
VertexGroupComputeUtil.uniqueVertexGroups(nodeToGroup)) {
+            if (!nodeGroup.isEmpty()) {
+                StreamNodeForwardGroup streamNodeForwardGroup =
+                        new StreamNodeForwardGroup(
+                                nodeGroup, 
topologicallySortedChainedStreamNodesMap);
+                for (StreamNode startNode : 
streamNodeForwardGroup.getStartNodes()) {
+                    result.put(startNode.getId(), streamNodeForwardGroup);
+                }
+            }
+        }
+        return result;
+    }
+
+    public static boolean canTargetMergeIntoSourceForwardGroup(
+            ForwardGroup sourceForwardGroup, ForwardGroup targetForwardGroup) {
+        if (sourceForwardGroup == null || targetForwardGroup == null) {
+            return false;
+        }
+
+        if (sourceForwardGroup == targetForwardGroup) {
+            return true;
+        }
+
+        if (sourceForwardGroup.isParallelismDecided()
+                && targetForwardGroup.isParallelismDecided()
+                && sourceForwardGroup.getParallelism() != 
targetForwardGroup.getParallelism()) {
+            return false;
+        }
+
+        if (sourceForwardGroup.isMaxParallelismDecided()
+                && targetForwardGroup.isMaxParallelismDecided()
+                && sourceForwardGroup.getMaxParallelism()
+                        > targetForwardGroup.getMaxParallelism()) {

Review Comment:
   > Why use '>'? Could you add a comment to explain this?
   
   If the max parallelism of the source is larger than that of the target, and 
the decided parallelism is equal to the max parallelism, the target can never 
achieve the same parallelism as the source, which may make the forward edge not 
working properly. Therefore, we need use '>'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to