dawidwys commented on a change in pull request #16437: URL: https://github.com/apache/flink/pull/16437#discussion_r667520506
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1600,6 +1605,85 @@ private OptionalLong restoreLatestCheckpointedStateInternal( } } + private static class VerticesFinishedCache { + + private final Map<JobVertexID, Boolean> finishedCache = new HashMap<>(); + private final Map<OperatorID, OperatorState> operatorStates; + + private VerticesFinishedCache(Map<OperatorID, OperatorState> operatorStates) { + this.operatorStates = operatorStates; + } + + public boolean getOrUpdate(ExecutionJobVertex vertex) { + return finishedCache.computeIfAbsent( + vertex.getJobVertexId(), + ignored -> calculateIfFinished(vertex, operatorStates)); + } + + private boolean calculateIfFinished( + ExecutionJobVertex vertex, Map<OperatorID, OperatorState> operatorStates) { + List<Boolean> operatorFinishedStates = + vertex.getOperatorIDs().stream() + .map(idPair -> checkOperatorFinished(operatorStates, idPair)) + .collect(Collectors.toList()); + + boolean anyFinished = operatorFinishedStates.stream().anyMatch(f -> f); + if (!anyFinished) { + return false; + } else { + boolean allFinished = operatorFinishedStates.stream().allMatch(f -> f); + if (!allFinished) { + throw new FlinkRuntimeException( + "Can not restore vertex " + + vertex.getJobVertexId() + + " which contain both finished and unfinished operators"); + } + return true; + } + } + + private boolean checkOperatorFinished( Review comment: I think it makes sense. Actually, the approach you provided is not equivalent. The code you provided does not account for a situation where you have both the user defined operator id and the generated operator id, but the checkpoint has a state associated with the generated operator id. Your code would not check that scenario. However, I looked into the StateAssignmentOperation and it follows what you suggested. I was wrong before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org