AHeise commented on a change in pull request #15013:
URL: https://github.com/apache/flink/pull/15013#discussion_r582764397



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -347,7 +348,13 @@ private void setChaining(Map<Integer, byte[]> hashes, 
List<Map<Integer, byte[]>>
         final Map<Integer, OperatorChainInfo> chainEntryPoints =
                 buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
         final Collection<OperatorChainInfo> initialEntryPoints =
-                new ArrayList<>(chainEntryPoints.values());
+                chainEntryPoints.values().stream()
+                        .sorted(
+                                Comparator.comparing(
+                                        operatorChainInfo ->
+                                                
hashes.get(operatorChainInfo.getStartNodeId()),
+                                        
StreamingJobGraphGenerator::compareHashes))

Review comment:
       No, `nodeId` is incremented in the `StreamEnvironment` for each added 
node. So each job vertex has a unique id, which may change on restart (that's 
why the hashes are used). `startNodeId` is the `nodeId` for the particular 
chain.
   
    There are also a couple of places where the nodes are indexed by the 
`nodeId`, so duplicates would break much more.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to