rmetzger commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576025634
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); + final CompletableFuture<String> savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - final CompletableFuture<JobStatus> terminationFuture = - executionGraph - .getTerminationFuture() Review comment: (Note to future self) We are not tracking this future anymore, but relying on the completion of all execution's termination futures. This seems to be sufficient, because the ExecutionGraph is doing the same on all relevant operations (suspend, failJob) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org