XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r574568680
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,56 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); + final CompletableFuture<String> savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - final CompletableFuture<JobStatus> terminationFuture = - executionGraph - .getTerminationFuture() - .handle( - (jobstatus, throwable) -> { - if (throwable != null) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: {}", - jobGraph.getJobID(), - throwable.getMessage()); - throw new CompletionException(throwable); - } else if (jobstatus != JobStatus.FINISHED) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", - jobGraph.getJobID(), - jobstatus); - throw new CompletionException( - new FlinkException( - "Reached state " - + jobstatus - + " instead of FINISHED.")); - } - return jobstatus; - }); - return savepointFuture - .thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) + .thenCompose( + path -> + executionGraphTerminationFuture + .handle( + (executionStates, throwable) -> { + Set<ExecutionState> nonFinishedStates = + extractNonFinishedStates( + executionStates); + if (throwable != null) { + log.info( + "Failed during stopping job {} with a savepoint. Reason: {}", + jobGraph.getJobID(), + throwable.getMessage()); + throw new CompletionException(throwable); + } else if (!nonFinishedStates.isEmpty()) { + log.info( + "Failed while stopping job {} after successfully creating a savepoint. A global failover is going to be triggered. Reason: One or more states ended up in the following termination states instead of FINISHED: {}", + jobGraph.getJobID(), + nonFinishedStates); + FlinkException + inconsistentFinalStateException = + new FlinkException( + String.format( + "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", + jobGraph + .getJobID())); + executionGraph.failGlobal( + inconsistentFinalStateException); Review comment: Cool, I rebased the branch and reverted back to `handleAsync`. One question, is there a reason to keep the leftover code from the IT test you reverted? If not, I would add a cleanup hotfix to the branch to remove things like `SavepointITCase.ischeckpointcoordinatorshutdownError` and the extensions you introduced for `InfiniteTestSource`. It looks like they we're only used for `SavepointITCase.testStopSavepointWithBoundedInputConcurrently`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org