XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r574522921
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,56 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); + final CompletableFuture<String> savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - final CompletableFuture<JobStatus> terminationFuture = - executionGraph - .getTerminationFuture() - .handle( - (jobstatus, throwable) -> { - if (throwable != null) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: {}", - jobGraph.getJobID(), - throwable.getMessage()); - throw new CompletionException(throwable); - } else if (jobstatus != JobStatus.FINISHED) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", - jobGraph.getJobID(), - jobstatus); - throw new CompletionException( - new FlinkException( - "Reached state " - + jobstatus - + " instead of FINISHED.")); - } - return jobstatus; - }); - return savepointFuture - .thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) + .thenCompose( + path -> + executionGraphTerminationFuture + .handle( + (executionStates, throwable) -> { + Set<ExecutionState> nonFinishedStates = + extractNonFinishedStates( + executionStates); + if (throwable != null) { + log.info( + "Failed during stopping job {} with a savepoint. Reason: {}", + jobGraph.getJobID(), + throwable.getMessage()); + throw new CompletionException(throwable); + } else if (!nonFinishedStates.isEmpty()) { + log.info( + "Failed while stopping job {} after successfully creating a savepoint. A global failover is going to be triggered. Reason: One or more states ended up in the following termination states instead of FINISHED: {}", + jobGraph.getJobID(), + nonFinishedStates); + FlinkException + inconsistentFinalStateException = + new FlinkException( + String.format( + "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", + jobGraph + .getJobID())); + executionGraph.failGlobal( + inconsistentFinalStateException); Review comment: Interesting finding. It was actually like that beforehand (using `handleAsync` in the `mainThreadExecutor`). But I ran into issues in `SavepointITCase.testStopSavepointWithBoundedInputConcurrently` which failed randomly. Switching to `handle` fixed it - hence, the change. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org