gaoyunhaii commented on a change in pull request #16437: URL: https://github.com/apache/flink/pull/16437#discussion_r667282368
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1600,6 +1605,85 @@ private OptionalLong restoreLatestCheckpointedStateInternal( } } + private static class VerticesFinishedCache { + + private final Map<JobVertexID, Boolean> finishedCache = new HashMap<>(); + private final Map<OperatorID, OperatorState> operatorStates; + + private VerticesFinishedCache(Map<OperatorID, OperatorState> operatorStates) { + this.operatorStates = operatorStates; + } + + public boolean getOrUpdate(ExecutionJobVertex vertex) { + return finishedCache.computeIfAbsent( + vertex.getJobVertexId(), + ignored -> calculateIfFinished(vertex, operatorStates)); + } + + private boolean calculateIfFinished( + ExecutionJobVertex vertex, Map<OperatorID, OperatorState> operatorStates) { + List<Boolean> operatorFinishedStates = + vertex.getOperatorIDs().stream() + .map(idPair -> checkOperatorFinished(operatorStates, idPair)) + .collect(Collectors.toList()); + + boolean anyFinished = operatorFinishedStates.stream().anyMatch(f -> f); + if (!anyFinished) { + return false; + } else { + boolean allFinished = operatorFinishedStates.stream().allMatch(f -> f); + if (!allFinished) { + throw new FlinkRuntimeException( + "Can not restore vertex " + + vertex.getJobVertexId() + + " which contain both finished and unfinished operators"); + } + return true; + } + } + + private boolean checkOperatorFinished( Review comment: Perhaps we could simplify this method to the follows ? ``` OperatorID operatorId = idPair.getUserDefinedOperatorID().orElse(idPair.getGeneratedOperatorID()); return Optional.ofNullable(operatorStates.get(operatorId)) .map(OperatorState::isFullyFinished) .orElse(false); ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1600,6 +1605,85 @@ private OptionalLong restoreLatestCheckpointedStateInternal( } } + private static class VerticesFinishedCache { + + private final Map<JobVertexID, Boolean> finishedCache = new HashMap<>(); + private final Map<OperatorID, OperatorState> operatorStates; + + private VerticesFinishedCache(Map<OperatorID, OperatorState> operatorStates) { + this.operatorStates = operatorStates; + } + + public boolean getOrUpdate(ExecutionJobVertex vertex) { + return finishedCache.computeIfAbsent( + vertex.getJobVertexId(), + ignored -> calculateIfFinished(vertex, operatorStates)); + } + + private boolean calculateIfFinished( + ExecutionJobVertex vertex, Map<OperatorID, OperatorState> operatorStates) { + List<Boolean> operatorFinishedStates = + vertex.getOperatorIDs().stream() + .map(idPair -> checkOperatorFinished(operatorStates, idPair)) + .collect(Collectors.toList()); + + boolean anyFinished = operatorFinishedStates.stream().anyMatch(f -> f); + if (!anyFinished) { + return false; + } else { + boolean allFinished = operatorFinishedStates.stream().allMatch(f -> f); + if (!allFinished) { + throw new FlinkRuntimeException( + "Can not restore vertex " + + vertex.getJobVertexId() + + " which contain both finished and unfinished operators"); + } + return true; + } + } + + private boolean checkOperatorFinished( + Map<OperatorID, OperatorState> operatorStates, OperatorIDPair idPair) { + Optional<OperatorState> stateByUserId = + idPair.getUserDefinedOperatorID() + .flatMap(id -> Optional.ofNullable(operatorStates.get(id))); + + return stateByUserId + .map(OperatorState::isFullyFinished) + .orElseGet( + () -> + Optional.ofNullable( + operatorStates.get( + idPair.getGeneratedOperatorID())) + .map(OperatorState::isFullyFinished) + .orElse(false)); + } + } + + private void validateFinishedOperators( + Set<ExecutionJobVertex> tasks, Map<OperatorID, OperatorState> operatorStates) { + + VerticesFinishedCache verticesFinishedCache = new VerticesFinishedCache(operatorStates); + for (ExecutionJobVertex task : tasks) { + boolean vertexFinished = verticesFinishedCache.getOrUpdate(task); + + if (vertexFinished) { + boolean allPredecessorsFinished = + task.getInputs().stream() + .map(IntermediateResult::getProducer) + .allMatch(verticesFinishedCache::getOrUpdate); + + if (!allPredecessorsFinished) { + throw new FlinkRuntimeException( + "Illegal JobGraph modification. Cannot run a program with finished" + + " vertices predeceased with running ones. Task vertex " + + task.getJobVertexId() Review comment: Similarly perhaps we also output `task.getName()` ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1600,6 +1605,85 @@ private OptionalLong restoreLatestCheckpointedStateInternal( } } + private static class VerticesFinishedCache { + + private final Map<JobVertexID, Boolean> finishedCache = new HashMap<>(); + private final Map<OperatorID, OperatorState> operatorStates; + + private VerticesFinishedCache(Map<OperatorID, OperatorState> operatorStates) { + this.operatorStates = operatorStates; + } + + public boolean getOrUpdate(ExecutionJobVertex vertex) { + return finishedCache.computeIfAbsent( + vertex.getJobVertexId(), + ignored -> calculateIfFinished(vertex, operatorStates)); + } + + private boolean calculateIfFinished( + ExecutionJobVertex vertex, Map<OperatorID, OperatorState> operatorStates) { + List<Boolean> operatorFinishedStates = + vertex.getOperatorIDs().stream() + .map(idPair -> checkOperatorFinished(operatorStates, idPair)) + .collect(Collectors.toList()); + + boolean anyFinished = operatorFinishedStates.stream().anyMatch(f -> f); + if (!anyFinished) { + return false; + } else { + boolean allFinished = operatorFinishedStates.stream().allMatch(f -> f); + if (!allFinished) { + throw new FlinkRuntimeException( + "Can not restore vertex " + + vertex.getJobVertexId() Review comment: Perhaps we also output `vertex.getName()` to the log~? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org