dawidwys commented on a change in pull request #16437:
URL: https://github.com/apache/flink/pull/16437#discussion_r667520506



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1600,6 +1605,85 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
         }
     }
 
+    private static class VerticesFinishedCache {
+
+        private final Map<JobVertexID, Boolean> finishedCache = new 
HashMap<>();
+        private final Map<OperatorID, OperatorState> operatorStates;
+
+        private VerticesFinishedCache(Map<OperatorID, OperatorState> 
operatorStates) {
+            this.operatorStates = operatorStates;
+        }
+
+        public boolean getOrUpdate(ExecutionJobVertex vertex) {
+            return finishedCache.computeIfAbsent(
+                    vertex.getJobVertexId(),
+                    ignored -> calculateIfFinished(vertex, operatorStates));
+        }
+
+        private boolean calculateIfFinished(
+                ExecutionJobVertex vertex, Map<OperatorID, OperatorState> 
operatorStates) {
+            List<Boolean> operatorFinishedStates =
+                    vertex.getOperatorIDs().stream()
+                            .map(idPair -> 
checkOperatorFinished(operatorStates, idPair))
+                            .collect(Collectors.toList());
+
+            boolean anyFinished = operatorFinishedStates.stream().anyMatch(f 
-> f);
+            if (!anyFinished) {
+                return false;
+            } else {
+                boolean allFinished = 
operatorFinishedStates.stream().allMatch(f -> f);
+                if (!allFinished) {
+                    throw new FlinkRuntimeException(
+                            "Can not restore vertex "
+                                    + vertex.getJobVertexId()
+                                    + " which contain both finished and 
unfinished operators");
+                }
+                return true;
+            }
+        }
+
+        private boolean checkOperatorFinished(

Review comment:
       I think it makes sense.
   
   Actually, the approach you provided is not equivalent. The code you provided 
does not account for a situation where you have both the user defined operator 
id and the generated operator id, but the checkpoint has a state associated 
with the generated operator id. Your code would not check that scenario. 
However, I looked into the StateAssignmentOperation and it follows what you 
suggested. I was wrong before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to