gaoyunhaii commented on a change in pull request #16437: URL: https://github.com/apache/flink/pull/16437#discussion_r667680364
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1600,6 +1605,85 @@ private OptionalLong restoreLatestCheckpointedStateInternal( } } + private static class VerticesFinishedCache { + + private final Map<JobVertexID, Boolean> finishedCache = new HashMap<>(); + private final Map<OperatorID, OperatorState> operatorStates; + + private VerticesFinishedCache(Map<OperatorID, OperatorState> operatorStates) { + this.operatorStates = operatorStates; + } + + public boolean getOrUpdate(ExecutionJobVertex vertex) { + return finishedCache.computeIfAbsent( + vertex.getJobVertexId(), + ignored -> calculateIfFinished(vertex, operatorStates)); + } + + private boolean calculateIfFinished( + ExecutionJobVertex vertex, Map<OperatorID, OperatorState> operatorStates) { + List<Boolean> operatorFinishedStates = + vertex.getOperatorIDs().stream() + .map(idPair -> checkOperatorFinished(operatorStates, idPair)) + .collect(Collectors.toList()); + + boolean anyFinished = operatorFinishedStates.stream().anyMatch(f -> f); + if (!anyFinished) { + return false; + } else { + boolean allFinished = operatorFinishedStates.stream().allMatch(f -> f); + if (!allFinished) { + throw new FlinkRuntimeException( + "Can not restore vertex " + + vertex.getJobVertexId() + + " which contain both finished and unfinished operators"); + } + return true; + } + } + + private boolean checkOperatorFinished( Review comment: Very thanks for the explanation! It seems user defined id should be only used in the special case that users want to restore a checkpoint and forget to define uid for operator before, then user could directly define the id of the operator to be the one in the checkpoint, thus it would be safe to view the user to define id to override the generated id~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org