gaoyunhaii commented on a change in pull request #16437:
URL: https://github.com/apache/flink/pull/16437#discussion_r667680364



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1600,6 +1605,85 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
         }
     }
 
+    private static class VerticesFinishedCache {
+
+        private final Map<JobVertexID, Boolean> finishedCache = new 
HashMap<>();
+        private final Map<OperatorID, OperatorState> operatorStates;
+
+        private VerticesFinishedCache(Map<OperatorID, OperatorState> 
operatorStates) {
+            this.operatorStates = operatorStates;
+        }
+
+        public boolean getOrUpdate(ExecutionJobVertex vertex) {
+            return finishedCache.computeIfAbsent(
+                    vertex.getJobVertexId(),
+                    ignored -> calculateIfFinished(vertex, operatorStates));
+        }
+
+        private boolean calculateIfFinished(
+                ExecutionJobVertex vertex, Map<OperatorID, OperatorState> 
operatorStates) {
+            List<Boolean> operatorFinishedStates =
+                    vertex.getOperatorIDs().stream()
+                            .map(idPair -> 
checkOperatorFinished(operatorStates, idPair))
+                            .collect(Collectors.toList());
+
+            boolean anyFinished = operatorFinishedStates.stream().anyMatch(f 
-> f);
+            if (!anyFinished) {
+                return false;
+            } else {
+                boolean allFinished = 
operatorFinishedStates.stream().allMatch(f -> f);
+                if (!allFinished) {
+                    throw new FlinkRuntimeException(
+                            "Can not restore vertex "
+                                    + vertex.getJobVertexId()
+                                    + " which contain both finished and 
unfinished operators");
+                }
+                return true;
+            }
+        }
+
+        private boolean checkOperatorFinished(

Review comment:
       Very thanks for the explanation! It seems user defined id should be only 
used in the special case that users want to restore a checkpoint and forget to 
define uid for operator before, then user could directly define the id of the 
operator to be the one in the checkpoint, thus it would be safe to view the 
user to define id to override the generated id~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to