dawidwys commented on a change in pull request #16019: URL: https://github.com/apache/flink/pull/16019#discussion_r642387239
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateAssignment.java ########## @@ -166,32 +169,47 @@ public OperatorSubtaskState getSubtaskState(OperatorInstanceID instanceID) { instanceID, inputOperatorID, getUpstreamAssignments(), - assignment -> assignment.outputSubtaskMappings, + assignment -> + assignment.outputSubtaskMappings.get( + getAssignmentIndex( + assignment.getDownstreamAssignments(), + this)), assignment -> assignment.getOutputMapping( - Arrays.asList(assignment.getDownstreamAssignments()) - .indexOf(this)), + getAssignmentIndex( + assignment.getDownstreamAssignments(), + this)), Review comment: I thought about different ways, but none of the ideas I had, looked so much better that I thought about implementing it. I added your suggestion (minus that the check is in the lambda rather than `getOutputMapping`). Take a look, not sure if it is much better :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org