AHeise commented on a change in pull request #15013: URL: https://github.com/apache/flink/pull/15013#discussion_r582764397
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ########## @@ -347,7 +348,13 @@ private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> final Map<Integer, OperatorChainInfo> chainEntryPoints = buildChainedInputsAndGetHeadInputs(hashes, legacyHashes); final Collection<OperatorChainInfo> initialEntryPoints = - new ArrayList<>(chainEntryPoints.values()); + chainEntryPoints.values().stream() + .sorted( + Comparator.comparing( + operatorChainInfo -> + hashes.get(operatorChainInfo.getStartNodeId()), + StreamingJobGraphGenerator::compareHashes)) Review comment: No, `nodeId` is incremented in the `StreamEnvironment` for each added node. So each job vertex has a unique id, which may change on restart (that's why the hashes are used). `startNodeId` is the `nodeId` for the particular chain. There are also a couple of places where the nodes are indexed by the `nodeId`, so duplicates would break much more. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org