rmetzger commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r575036273



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .map(Execution::getTerminalStateFuture)
+                                .collect(Collectors.toList()));
+
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
                         .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()
-                        .handle(
-                                (jobstatus, throwable) -> {
-                                    if (throwable != null) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: {}",
-                                                jobGraph.getJobID(),
-                                                throwable.getMessage());
-                                        throw new 
CompletionException(throwable);
-                                    } else if (jobstatus != 
JobStatus.FINISHED) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-                                                jobGraph.getJobID(),
-                                                jobstatus);
-                                        throw new CompletionException(
-                                                new FlinkException(
-                                                        "Reached state "
-                                                                + jobstatus
-                                                                + " instead of 
FINISHED."));
-                                    }
-                                    return jobstatus;
-                                });
-
         return savepointFuture
-                .thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+                .thenCompose(
+                        path ->
+                                executionGraphTerminationFuture
+                                        .handleAsync(
+                                                (executionStates, throwable) 
-> {
+                                                    Set<ExecutionState> 
nonFinishedStates =
+                                                            
extractNonFinishedStates(
+                                                                    
executionStates);
+                                                    if (throwable != null) {
+                                                        log.info(
+                                                                "Failed during 
stopping job {} with a savepoint. Reason: {}",
+                                                                
jobGraph.getJobID(),
+                                                                
throwable.getMessage());
+                                                        throw new 
CompletionException(throwable);
+                                                    } else if 
(!nonFinishedStates.isEmpty()) {
+                                                        log.info(
+                                                                "Failed while 
stopping job {} after successfully creating a savepoint. A global failover is 
going to be triggered. Reason: One or more states ended up in the following 
termination states instead of FINISHED: {}",
+                                                                
jobGraph.getJobID(),
+                                                                
nonFinishedStates);
+                                                        FlinkException
+                                                                
inconsistentFinalStateException =
+                                                                        new 
FlinkException(
+                                                                               
 String.format(
+                                                                               
         "Inconsistent execution state after stopping with savepoint. A global 
fail-over was triggered to recover the job %s.",
+                                                                               
         jobGraph
+                                                                               
                 .getJobID()));
+                                                        
executionGraph.failGlobal(
+                                                                
inconsistentFinalStateException);

Review comment:
       Have you considered other means of triggering a global failover? This 
approach seems to work, but I'm not sure if it is a good idea make a detour via 
the execution graph to notify the scheduler, from within the scheduler that we 
want a global failover.
   
   We could also introduce an abstract "triggerGlobalFailover()" method into 
the SchedulerBase for this purpose:
   ```diff
   diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
   index edd1da436c3..aa5cb248d48 100644
   --- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
   +++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
   @@ -333,6 +333,11 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
            schedulingStrategy.onPartitionConsumable(partitionId);
        }
   
   +    @Override
   +    protected void triggerGlobalFailover(Throwable cause) {
   +        handleGlobalFailure(cause);
   +    }
   +
        // 
------------------------------------------------------------------------
        // SchedulerOperations
        // 
------------------------------------------------------------------------
   diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
   index 4144b0dc5b2..ef6147eb187 100644
   --- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
   +++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
   @@ -950,7 +950,7 @@ public abstract class SchedulerBase implements 
SchedulerNG {
                                                                                
            "Inconsistent execution state after stopping with savepoint. A 
global fail-over was triggered to recover the job %s.",
                                                                                
            jobGraph
                                                                                
                    .getJobID()));
   -                                                        
executionGraph.failGlobal(
   +                                                        
triggerGlobalFailover(
                                                                    
inconsistentFinalStateException);
   
                                                            throw new 
CompletionException(
   @@ -973,6 +973,8 @@ public abstract class SchedulerBase implements 
SchedulerNG {
                            mainThreadExecutor);
        }
   
   +    protected abstract void triggerGlobalFailover(Throwable cause);
   +
        private static Set<ExecutionState> extractNonFinishedStates(
   ```
   
   I'm not saying you should change the code, I would rather like to hear your 
opinion and discuss this first.
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .map(Execution::getTerminalStateFuture)
+                                .collect(Collectors.toList()));
+
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
                         .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()
-                        .handle(
-                                (jobstatus, throwable) -> {
-                                    if (throwable != null) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: {}",
-                                                jobGraph.getJobID(),
-                                                throwable.getMessage());
-                                        throw new 
CompletionException(throwable);
-                                    } else if (jobstatus != 
JobStatus.FINISHED) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-                                                jobGraph.getJobID(),
-                                                jobstatus);
-                                        throw new CompletionException(
-                                                new FlinkException(
-                                                        "Reached state "
-                                                                + jobstatus
-                                                                + " instead of 
FINISHED."));
-                                    }
-                                    return jobstatus;
-                                });
-
         return savepointFuture
-                .thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+                .thenCompose(
+                        path ->
+                                executionGraphTerminationFuture
+                                        .handleAsync(

Review comment:
       I wanted to suggest moving the .handleAsync() up to the definition of 
the `executionGraphTerminationFuture`, as it would reduce the indentation by 
one level, and move things conceptually related closer to each other.
   However, the `testStopWithSavepointFailingWithDeclinedCheckpoint` test 
started failing (it isn't clear to my why that's happening).
   Do you happen to know why this happens?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -539,6 +555,171 @@ public void testSubmitWithUnknownSavepointPath() throws 
Exception {
         }
     }
 
+    @Test
+    public void testStopWithSavepointFailingInSnapshotCreation() throws 
Exception {
+        testStopWithFailingSourceInOnePipeline(
+                new SnapshotFailingInfiniteTestSource(),
+                folder.newFolder(),
+                // two restarts expected:
+                // 1. task failure restart
+                // 2. job failover triggered by the CheckpointFailureManager
+                2,
+                assertInSnapshotCreationFailure());
+    }
+
+    @Test
+    public void testStopWithSavepointFailingAfterSnapshotCreation() throws 
Exception {
+        testStopWithFailingSourceInOnePipeline(
+                new InfiniteTestSource() {
+                    @Override
+                    public void cancel() {
+                        throw new RuntimeException(
+                                "Expected RuntimeException after snapshot 
creation.");
+                    }
+                },
+                folder.newFolder(),
+                // two restarts expected:
+                // 1. task failure restart
+                // 2. job failover triggered by SchedulerBase.stopWithSavepoint
+                2,
+                assertAfterSnapshotCreationFailure());
+    }
+
+    private static BiConsumer<JobID, ExecutionException> 
assertAfterSnapshotCreationFailure() {
+        return (jobId, actualException) -> {
+            Optional<FlinkException> actualFlinkException =
+                    ExceptionUtils.findThrowable(actualException, 
FlinkException.class);
+            assertTrue(actualFlinkException.isPresent());
+            assertThat(
+                    actualFlinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobId)));
+        };
+    }
+
+    private static BiConsumer<JobID, ExecutionException> 
assertInSnapshotCreationFailure() {
+        return (ignored, actualException) -> {
+            Optional<CheckpointException> actualFailureCause =
+                    ExceptionUtils.findThrowable(actualException, 
CheckpointException.class);
+            assertTrue(actualFailureCause.isPresent());
+            assertThat(
+                    actualFailureCause.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.JOB_FAILOVER_REGION));
+        };
+    }
+
+    private static OneShotLatch failingPipelineLatch;
+    private static OneShotLatch succeedingPipelineLatch;
+
+    /**
+     * FLINK-21030
+     *
+     * <p>Tests the handling of a failure that happened while stopping an 
embarrassingly parallel
+     * job with a Savepoint. The test expects that the stopping action fails 
and all executions are
+     * in state {@code RUNNING} afterwards.
+     *
+     * @param failingSource the failing {@link SourceFunction} used in one of 
the two pipelines.
+     * @param expectedMaximumNumberOfRestarts the maximum number of restarts 
allowed by the restart
+     *     strategy.
+     * @param exceptionAssertion asserts the client-call exception to verify 
that the right error
+     *     was handled.
+     * @see SavepointITCase#failingPipelineLatch The latch used to trigger the 
successful start of
+     *     the later on failing pipeline.
+     * @see SavepointITCase#succeedingPipelineLatch The latch that triggers 
the successful start of
+     *     the succeeding pipeline.
+     * @throws Exception if an error occurred while running the test.
+     */
+    private static void testStopWithFailingSourceInOnePipeline(
+            InfiniteTestSource failingSource,
+            File savepointDir,
+            int expectedMaximumNumberOfRestarts,
+            BiConsumer<JobID, ExecutionException> exceptionAssertion)
+            throws Exception {
+        MiniClusterWithClientResource cluster =
+                new MiniClusterWithClientResource(
+                        new 
MiniClusterResourceConfiguration.Builder().build());
+
+        failingPipelineLatch = new OneShotLatch();
+        succeedingPipelineLatch = new OneShotLatch();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.getConfig()
+                .setRestartStrategy(
+                        
RestartStrategies.fixedDelayRestart(expectedMaximumNumberOfRestarts, 10));

Review comment:
       ```suggestion
                           
RestartStrategies.fixedDelayRestart(expectedMaximumNumberOfRestarts, 0));
   ```
   Let's not waste CI time ;) 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .map(Execution::getTerminalStateFuture)
+                                .collect(Collectors.toList()));

Review comment:
       nit: maybe move the creation of this list outside the future creation to 
improve readability

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .map(Execution::getTerminalStateFuture)
+                                .collect(Collectors.toList()));
+
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
                         .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()

Review comment:
       We are not tracking this future anymore, but relying on the completion 
of all execution's termination futures.
   
   This seems to be sufficient, because the ExecutionGraph is doing the same on 
all relevant operations (suspend, failJob)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        CountDownLatch checkpointAbortionConfirmedLatch = new 
CountDownLatch(2);
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+                    
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+                    checkpointAbortionConfirmedLatch.countDown();
+                });
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointCompleted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint("savepoint-path", false);
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+        final DeclineCheckpoint declineCheckpoint =
+                new DeclineCheckpoint(
+                        jobGraph.getJobID(),
+                        failingExecutionAttemptId,
+                        1,
+                        new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+        checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, 
"unknown location");
+
+        // we need to wait for the confirmations to be collected since this is 
running in a separate
+        // thread
+        checkpointAbortionConfirmedLatch.await();
+
+        assertThat(
+                "Both of the executions where notified about the aborted 
checkpoint.",
+                executionAttemptIdsWithAbortedCheckpoint,
+                is(Arrays.asList(succeedingExecutionAttemptId, 
failingExecutionAttemptId)));
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+
+        // the restart due to failed checkpoint handling triggering a global 
job fail-over
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(1));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =
+                Executors.newSingleThreadScheduledExecutor();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+
+        final DefaultScheduler scheduler =
+                CompletableFuture.supplyAsync(
+                                () ->
+                                        createSchedulerAndStartScheduling(
+                                                jobGraph, mainThreadExecutor),
+                                mainThreadExecutor)
+                        .get();
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                CompletableFuture.supplyAsync(
+                                () -> {
+                                    // we have to make sure that the tasks are 
running before
+                                    // stop-with-savepoint is triggered
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    failingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    
succeedingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+
+                                    return 
scheduler.stopWithSavepoint("savepoint-path", false);
+                                },
+                                mainThreadExecutor)
+                        .get();
+
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+        // we need to wait for the expired checkpoint to be handled
+        checkpointAbortionWasTriggered.await();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            succeedingExecutionAttemptId,
+                                            ExecutionState.FINISHED));
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            failingExecutionAttemptId,
+                                            ExecutionState.FAILED));
+
+                            // the restart due to failed checkpoint handling 
triggering a global job
+                            // fail-over
+                            assertThat(
+                                    
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+                            
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+                        },
+                        mainThreadExecutor)
+                .get();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+        }
+
+        // we have to wait for the main executor to be finished with 
restarting tasks
+        singleThreadExecutorService.shutdown();
+        singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        // we don't allow any restarts during the happy path
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final Set<ExecutionAttemptID> executionAttemptIds =
+                StreamSupport.stream(
+                                scheduler
+                                        .getExecutionGraph()
+                                        .getAllExecutionVertices()
+                                        .spliterator(),
+                                false)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .map(Execution::getAttemptId)
+                        .collect(Collectors.toSet());
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        executionAttemptIds.forEach(
+                id ->
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionState(
+                                        jobGraph.getJobID(), id, 
ExecutionState.RUNNING)));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(executionAttemptIds.toArray()));
+
+        executionAttemptIds.forEach(
+                id ->
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionState(
+                                        jobGraph.getJobID(), id, 
ExecutionState.FINISHED)));
+
+        try {
+            assertThat(
+                    stopWithSavepointFuture.get(),
+                    startsWith(String.format("file:%s", savepointFolder)));
+        } catch (ExecutionException e) {
+            fail("No exception is expected.");
+        }

Review comment:
       Can't you just remove the try catch and throw the exception out of the 
test. Then, we would get the stack trace + message also in CI output.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        CountDownLatch checkpointAbortionConfirmedLatch = new 
CountDownLatch(2);
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+                    
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+                    checkpointAbortionConfirmedLatch.countDown();
+                });
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointCompleted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint("savepoint-path", false);
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+        final DeclineCheckpoint declineCheckpoint =
+                new DeclineCheckpoint(
+                        jobGraph.getJobID(),
+                        failingExecutionAttemptId,
+                        1,
+                        new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+        checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, 
"unknown location");
+
+        // we need to wait for the confirmations to be collected since this is 
running in a separate
+        // thread
+        checkpointAbortionConfirmedLatch.await();
+
+        assertThat(
+                "Both of the executions where notified about the aborted 
checkpoint.",
+                executionAttemptIdsWithAbortedCheckpoint,
+                is(Arrays.asList(succeedingExecutionAttemptId, 
failingExecutionAttemptId)));

Review comment:
       Do we need to enforce order here? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .map(Execution::getTerminalStateFuture)
+                                .collect(Collectors.toList()));
+
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
                         .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()
-                        .handle(
-                                (jobstatus, throwable) -> {
-                                    if (throwable != null) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: {}",
-                                                jobGraph.getJobID(),
-                                                throwable.getMessage());
-                                        throw new 
CompletionException(throwable);
-                                    } else if (jobstatus != 
JobStatus.FINISHED) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-                                                jobGraph.getJobID(),
-                                                jobstatus);
-                                        throw new CompletionException(
-                                                new FlinkException(
-                                                        "Reached state "
-                                                                + jobstatus
-                                                                + " instead of 
FINISHED."));
-                                    }
-                                    return jobstatus;
-                                });
-
         return savepointFuture
-                .thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+                .thenCompose(
+                        path ->
+                                executionGraphTerminationFuture
+                                        .handleAsync(

Review comment:
       If we are going to change the behavior here, we also need to change the 
`FutureUtils.combineAll()` to a `waitForAll()`, to get all the final states.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .map(Execution::getTerminalStateFuture)
+                                .collect(Collectors.toList()));
+
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
                         .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()
-                        .handle(
-                                (jobstatus, throwable) -> {
-                                    if (throwable != null) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: {}",
-                                                jobGraph.getJobID(),
-                                                throwable.getMessage());
-                                        throw new 
CompletionException(throwable);
-                                    } else if (jobstatus != 
JobStatus.FINISHED) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-                                                jobGraph.getJobID(),
-                                                jobstatus);
-                                        throw new CompletionException(
-                                                new FlinkException(
-                                                        "Reached state "
-                                                                + jobstatus
-                                                                + " instead of 
FINISHED."));
-                                    }
-                                    return jobstatus;
-                                });
-
         return savepointFuture
-                .thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+                .thenCompose(
+                        path ->
+                                executionGraphTerminationFuture
+                                        .handleAsync(
+                                                (executionStates, throwable) 
-> {
+                                                    Set<ExecutionState> 
nonFinishedStates =
+                                                            
extractNonFinishedStates(
+                                                                    
executionStates);
+                                                    if (throwable != null) {

Review comment:
       Under which circumstances are we running into this condition?
   
   (Background: We should end up in this condition if the termination future of 
the `Execution` is completed exceptionally. I couldn't spot where that's 
happening. But if there's ever going to be a code change that returns a 
throwable here, then we might now properly go into a global failover)

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        CountDownLatch checkpointAbortionConfirmedLatch = new 
CountDownLatch(2);
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+                    
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+                    checkpointAbortionConfirmedLatch.countDown();
+                });
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointCompleted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint("savepoint-path", false);
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+        final DeclineCheckpoint declineCheckpoint =
+                new DeclineCheckpoint(
+                        jobGraph.getJobID(),
+                        failingExecutionAttemptId,
+                        1,
+                        new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+        checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, 
"unknown location");
+
+        // we need to wait for the confirmations to be collected since this is 
running in a separate
+        // thread
+        checkpointAbortionConfirmedLatch.await();
+
+        assertThat(
+                "Both of the executions where notified about the aborted 
checkpoint.",
+                executionAttemptIdsWithAbortedCheckpoint,
+                is(Arrays.asList(succeedingExecutionAttemptId, 
failingExecutionAttemptId)));
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+
+        // the restart due to failed checkpoint handling triggering a global 
job fail-over
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(1));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =

Review comment:
       Why can't we use 
`ComponentMainThreadExecutorServiceAdapter.forMainThread()`?
   
   The proper solution would be passing checkpoint coordinator or its timer 
thread from the test into it. But that introducing this change seems beyond the 
scope of this PR.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));

Review comment:
       This feels like the test depends on some implementation details of the 
test's subject. It is likely that somebody who's changing the scheduler, and 
subsequently runs into a test failure here will just adjust this number to make 
the test pass again. Can you comment which two scheduled tasks are expected to 
be in there?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to