gaoyunhaii commented on a change in pull request #16437:
URL: https://github.com/apache/flink/pull/16437#discussion_r667282368



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1600,6 +1605,85 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
         }
     }
 
+    private static class VerticesFinishedCache {
+
+        private final Map<JobVertexID, Boolean> finishedCache = new 
HashMap<>();
+        private final Map<OperatorID, OperatorState> operatorStates;
+
+        private VerticesFinishedCache(Map<OperatorID, OperatorState> 
operatorStates) {
+            this.operatorStates = operatorStates;
+        }
+
+        public boolean getOrUpdate(ExecutionJobVertex vertex) {
+            return finishedCache.computeIfAbsent(
+                    vertex.getJobVertexId(),
+                    ignored -> calculateIfFinished(vertex, operatorStates));
+        }
+
+        private boolean calculateIfFinished(
+                ExecutionJobVertex vertex, Map<OperatorID, OperatorState> 
operatorStates) {
+            List<Boolean> operatorFinishedStates =
+                    vertex.getOperatorIDs().stream()
+                            .map(idPair -> 
checkOperatorFinished(operatorStates, idPair))
+                            .collect(Collectors.toList());
+
+            boolean anyFinished = operatorFinishedStates.stream().anyMatch(f 
-> f);
+            if (!anyFinished) {
+                return false;
+            } else {
+                boolean allFinished = 
operatorFinishedStates.stream().allMatch(f -> f);
+                if (!allFinished) {
+                    throw new FlinkRuntimeException(
+                            "Can not restore vertex "
+                                    + vertex.getJobVertexId()
+                                    + " which contain both finished and 
unfinished operators");
+                }
+                return true;
+            }
+        }
+
+        private boolean checkOperatorFinished(

Review comment:
       Perhaps we could simplify this method to the follows ?
   ```
   OperatorID operatorId =
           
idPair.getUserDefinedOperatorID().orElse(idPair.getGeneratedOperatorID());
   return Optional.ofNullable(operatorStates.get(operatorId))
           .map(OperatorState::isFullyFinished)
           .orElse(false);
   ``` 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1600,6 +1605,85 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
         }
     }
 
+    private static class VerticesFinishedCache {
+
+        private final Map<JobVertexID, Boolean> finishedCache = new 
HashMap<>();
+        private final Map<OperatorID, OperatorState> operatorStates;
+
+        private VerticesFinishedCache(Map<OperatorID, OperatorState> 
operatorStates) {
+            this.operatorStates = operatorStates;
+        }
+
+        public boolean getOrUpdate(ExecutionJobVertex vertex) {
+            return finishedCache.computeIfAbsent(
+                    vertex.getJobVertexId(),
+                    ignored -> calculateIfFinished(vertex, operatorStates));
+        }
+
+        private boolean calculateIfFinished(
+                ExecutionJobVertex vertex, Map<OperatorID, OperatorState> 
operatorStates) {
+            List<Boolean> operatorFinishedStates =
+                    vertex.getOperatorIDs().stream()
+                            .map(idPair -> 
checkOperatorFinished(operatorStates, idPair))
+                            .collect(Collectors.toList());
+
+            boolean anyFinished = operatorFinishedStates.stream().anyMatch(f 
-> f);
+            if (!anyFinished) {
+                return false;
+            } else {
+                boolean allFinished = 
operatorFinishedStates.stream().allMatch(f -> f);
+                if (!allFinished) {
+                    throw new FlinkRuntimeException(
+                            "Can not restore vertex "
+                                    + vertex.getJobVertexId()
+                                    + " which contain both finished and 
unfinished operators");
+                }
+                return true;
+            }
+        }
+
+        private boolean checkOperatorFinished(
+                Map<OperatorID, OperatorState> operatorStates, OperatorIDPair 
idPair) {
+            Optional<OperatorState> stateByUserId =
+                    idPair.getUserDefinedOperatorID()
+                            .flatMap(id -> 
Optional.ofNullable(operatorStates.get(id)));
+
+            return stateByUserId
+                    .map(OperatorState::isFullyFinished)
+                    .orElseGet(
+                            () ->
+                                    Optional.ofNullable(
+                                                    operatorStates.get(
+                                                            
idPair.getGeneratedOperatorID()))
+                                            
.map(OperatorState::isFullyFinished)
+                                            .orElse(false));
+        }
+    }
+
+    private void validateFinishedOperators(
+            Set<ExecutionJobVertex> tasks, Map<OperatorID, OperatorState> 
operatorStates) {
+
+        VerticesFinishedCache verticesFinishedCache = new 
VerticesFinishedCache(operatorStates);
+        for (ExecutionJobVertex task : tasks) {
+            boolean vertexFinished = verticesFinishedCache.getOrUpdate(task);
+
+            if (vertexFinished) {
+                boolean allPredecessorsFinished =
+                        task.getInputs().stream()
+                                .map(IntermediateResult::getProducer)
+                                .allMatch(verticesFinishedCache::getOrUpdate);
+
+                if (!allPredecessorsFinished) {
+                    throw new FlinkRuntimeException(
+                            "Illegal JobGraph modification. Cannot run a 
program with finished"
+                                    + " vertices predeceased with running 
ones. Task vertex "
+                                    + task.getJobVertexId()

Review comment:
       Similarly perhaps we also output `task.getName()` ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1600,6 +1605,85 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
         }
     }
 
+    private static class VerticesFinishedCache {
+
+        private final Map<JobVertexID, Boolean> finishedCache = new 
HashMap<>();
+        private final Map<OperatorID, OperatorState> operatorStates;
+
+        private VerticesFinishedCache(Map<OperatorID, OperatorState> 
operatorStates) {
+            this.operatorStates = operatorStates;
+        }
+
+        public boolean getOrUpdate(ExecutionJobVertex vertex) {
+            return finishedCache.computeIfAbsent(
+                    vertex.getJobVertexId(),
+                    ignored -> calculateIfFinished(vertex, operatorStates));
+        }
+
+        private boolean calculateIfFinished(
+                ExecutionJobVertex vertex, Map<OperatorID, OperatorState> 
operatorStates) {
+            List<Boolean> operatorFinishedStates =
+                    vertex.getOperatorIDs().stream()
+                            .map(idPair -> 
checkOperatorFinished(operatorStates, idPair))
+                            .collect(Collectors.toList());
+
+            boolean anyFinished = operatorFinishedStates.stream().anyMatch(f 
-> f);
+            if (!anyFinished) {
+                return false;
+            } else {
+                boolean allFinished = 
operatorFinishedStates.stream().allMatch(f -> f);
+                if (!allFinished) {
+                    throw new FlinkRuntimeException(
+                            "Can not restore vertex "
+                                    + vertex.getJobVertexId()

Review comment:
       Perhaps we also output `vertex.getName()` to the log~?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to