rmetzger commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r575036273
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); + final CompletableFuture<String> savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - final CompletableFuture<JobStatus> terminationFuture = - executionGraph - .getTerminationFuture() - .handle( - (jobstatus, throwable) -> { - if (throwable != null) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: {}", - jobGraph.getJobID(), - throwable.getMessage()); - throw new CompletionException(throwable); - } else if (jobstatus != JobStatus.FINISHED) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", - jobGraph.getJobID(), - jobstatus); - throw new CompletionException( - new FlinkException( - "Reached state " - + jobstatus - + " instead of FINISHED.")); - } - return jobstatus; - }); - return savepointFuture - .thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) + .thenCompose( + path -> + executionGraphTerminationFuture + .handleAsync( + (executionStates, throwable) -> { + Set<ExecutionState> nonFinishedStates = + extractNonFinishedStates( + executionStates); + if (throwable != null) { + log.info( + "Failed during stopping job {} with a savepoint. Reason: {}", + jobGraph.getJobID(), + throwable.getMessage()); + throw new CompletionException(throwable); + } else if (!nonFinishedStates.isEmpty()) { + log.info( + "Failed while stopping job {} after successfully creating a savepoint. A global failover is going to be triggered. Reason: One or more states ended up in the following termination states instead of FINISHED: {}", + jobGraph.getJobID(), + nonFinishedStates); + FlinkException + inconsistentFinalStateException = + new FlinkException( + String.format( + "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", + jobGraph + .getJobID())); + executionGraph.failGlobal( + inconsistentFinalStateException); Review comment: Have you considered other means of triggering a global failover? This approach seems to work, but I'm not sure if it is a good idea make a detour via the execution graph to notify the scheduler, from within the scheduler that we want a global failover. We could also introduce an abstract "triggerGlobalFailover()" method into the SchedulerBase for this purpose: ```diff diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index edd1da436c3..aa5cb248d48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -333,6 +333,11 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio schedulingStrategy.onPartitionConsumable(partitionId); } + @Override + protected void triggerGlobalFailover(Throwable cause) { + handleGlobalFailure(cause); + } + // ------------------------------------------------------------------------ // SchedulerOperations // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 4144b0dc5b2..ef6147eb187 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -950,7 +950,7 @@ public abstract class SchedulerBase implements SchedulerNG { "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", jobGraph .getJobID())); - executionGraph.failGlobal( + triggerGlobalFailover( inconsistentFinalStateException); throw new CompletionException( @@ -973,6 +973,8 @@ public abstract class SchedulerBase implements SchedulerNG { mainThreadExecutor); } + protected abstract void triggerGlobalFailover(Throwable cause); + private static Set<ExecutionState> extractNonFinishedStates( ``` I'm not saying you should change the code, I would rather like to hear your opinion and discuss this first. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); + final CompletableFuture<String> savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - final CompletableFuture<JobStatus> terminationFuture = - executionGraph - .getTerminationFuture() - .handle( - (jobstatus, throwable) -> { - if (throwable != null) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: {}", - jobGraph.getJobID(), - throwable.getMessage()); - throw new CompletionException(throwable); - } else if (jobstatus != JobStatus.FINISHED) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", - jobGraph.getJobID(), - jobstatus); - throw new CompletionException( - new FlinkException( - "Reached state " - + jobstatus - + " instead of FINISHED.")); - } - return jobstatus; - }); - return savepointFuture - .thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) + .thenCompose( + path -> + executionGraphTerminationFuture + .handleAsync( Review comment: I wanted to suggest moving the .handleAsync() up to the definition of the `executionGraphTerminationFuture`, as it would reduce the indentation by one level, and move things conceptually related closer to each other. However, the `testStopWithSavepointFailingWithDeclinedCheckpoint` test started failing (it isn't clear to my why that's happening). Do you happen to know why this happens? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ########## @@ -539,6 +555,171 @@ public void testSubmitWithUnknownSavepointPath() throws Exception { } } + @Test + public void testStopWithSavepointFailingInSnapshotCreation() throws Exception { + testStopWithFailingSourceInOnePipeline( + new SnapshotFailingInfiniteTestSource(), + folder.newFolder(), + // two restarts expected: + // 1. task failure restart + // 2. job failover triggered by the CheckpointFailureManager + 2, + assertInSnapshotCreationFailure()); + } + + @Test + public void testStopWithSavepointFailingAfterSnapshotCreation() throws Exception { + testStopWithFailingSourceInOnePipeline( + new InfiniteTestSource() { + @Override + public void cancel() { + throw new RuntimeException( + "Expected RuntimeException after snapshot creation."); + } + }, + folder.newFolder(), + // two restarts expected: + // 1. task failure restart + // 2. job failover triggered by SchedulerBase.stopWithSavepoint + 2, + assertAfterSnapshotCreationFailure()); + } + + private static BiConsumer<JobID, ExecutionException> assertAfterSnapshotCreationFailure() { + return (jobId, actualException) -> { + Optional<FlinkException> actualFlinkException = + ExceptionUtils.findThrowable(actualException, FlinkException.class); + assertTrue(actualFlinkException.isPresent()); + assertThat( + actualFlinkException.get().getMessage(), + is( + String.format( + "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", + jobId))); + }; + } + + private static BiConsumer<JobID, ExecutionException> assertInSnapshotCreationFailure() { + return (ignored, actualException) -> { + Optional<CheckpointException> actualFailureCause = + ExceptionUtils.findThrowable(actualException, CheckpointException.class); + assertTrue(actualFailureCause.isPresent()); + assertThat( + actualFailureCause.get().getCheckpointFailureReason(), + is(CheckpointFailureReason.JOB_FAILOVER_REGION)); + }; + } + + private static OneShotLatch failingPipelineLatch; + private static OneShotLatch succeedingPipelineLatch; + + /** + * FLINK-21030 + * + * <p>Tests the handling of a failure that happened while stopping an embarrassingly parallel + * job with a Savepoint. The test expects that the stopping action fails and all executions are + * in state {@code RUNNING} afterwards. + * + * @param failingSource the failing {@link SourceFunction} used in one of the two pipelines. + * @param expectedMaximumNumberOfRestarts the maximum number of restarts allowed by the restart + * strategy. + * @param exceptionAssertion asserts the client-call exception to verify that the right error + * was handled. + * @see SavepointITCase#failingPipelineLatch The latch used to trigger the successful start of + * the later on failing pipeline. + * @see SavepointITCase#succeedingPipelineLatch The latch that triggers the successful start of + * the succeeding pipeline. + * @throws Exception if an error occurred while running the test. + */ + private static void testStopWithFailingSourceInOnePipeline( + InfiniteTestSource failingSource, + File savepointDir, + int expectedMaximumNumberOfRestarts, + BiConsumer<JobID, ExecutionException> exceptionAssertion) + throws Exception { + MiniClusterWithClientResource cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder().build()); + + failingPipelineLatch = new OneShotLatch(); + succeedingPipelineLatch = new OneShotLatch(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.getConfig() + .setRestartStrategy( + RestartStrategies.fixedDelayRestart(expectedMaximumNumberOfRestarts, 10)); Review comment: ```suggestion RestartStrategies.fixedDelayRestart(expectedMaximumNumberOfRestarts, 0)); ``` Let's not waste CI time ;) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); Review comment: nit: maybe move the creation of this list outside the future creation to improve readability ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); + final CompletableFuture<String> savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - final CompletableFuture<JobStatus> terminationFuture = - executionGraph - .getTerminationFuture() Review comment: We are not tracking this future anymore, but relying on the completion of all execution's termination futures. This seems to be sufficient, because the ExecutionGraph is doing the same on all relevant operations (suspend, failJob) ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } + @Test + public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { + // initially, we don't allow any restarts since the first phase (savepoint creation) + // succeeds without any failures + testRestartBackoffTimeStrategy.setCanRestart(false); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + final List<ExecutionAttemptID> executionAttemptIdsWithCompletedCheckpoint = + new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointAborted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + // trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint(savepointFolder, false); + checkpointTriggeredLatch.await(); + + acknowledgePendingCheckpoint(scheduler, 1); + + assertThat( + "Both the executions where notified about the completed checkpoint.", + executionAttemptIdsWithCompletedCheckpoint, + containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + + // The savepoint creation succeeded a failure happens in the second phase when finishing + // the tasks. That's why, the restarting policy is enabled. + testRestartBackoffTimeStrategy.setCanRestart(true); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + + // the restarts due to local failure handling and global job fail-over are triggered + assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + + try { + stopWithSavepointFuture.get(); + fail("An exception is expected."); + } catch (ExecutionException e) { + Optional<FlinkException> flinkException = + ExceptionUtils.findThrowable(e, FlinkException.class); + + assertTrue(flinkException.isPresent()); + assertThat( + flinkException.get().getMessage(), + is( + String.format( + "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", + jobGraph.getJobID()))); + } + + assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); + } + + @Test + public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exception { + // we allow restarts right from the start since the failure is going to happen in the first + // phase (savepoint creation) of stop-with-savepoint + testRestartBackoffTimeStrategy.setCanRestart(true); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + CountDownLatch checkpointAbortionConfirmedLatch = new CountDownLatch(2); + final List<ExecutionAttemptID> executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> { + executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId); + checkpointAbortionConfirmedLatch.countDown(); + }); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointCompleted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint("savepoint-path", false); + checkpointTriggeredLatch.await(); + + final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + + final AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + final DeclineCheckpoint declineCheckpoint = + new DeclineCheckpoint( + jobGraph.getJobID(), + failingExecutionAttemptId, + 1, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)); + + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, "unknown location"); + + // we need to wait for the confirmations to be collected since this is running in a separate + // thread + checkpointAbortionConfirmedLatch.await(); + + assertThat( + "Both of the executions where notified about the aborted checkpoint.", + executionAttemptIdsWithAbortedCheckpoint, + is(Arrays.asList(succeedingExecutionAttemptId, failingExecutionAttemptId))); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); + + // the restart due to failed checkpoint handling triggering a global job fail-over + assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1)); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + + try { + stopWithSavepointFuture.get(); + fail("An exception is expected."); + } catch (ExecutionException e) { + Optional<CheckpointException> actualCheckpointException = + findThrowable(e, CheckpointException.class); + assertTrue(actualCheckpointException.isPresent()); + assertThat( + actualCheckpointException.get().getCheckpointFailureReason(), + is(CheckpointFailureReason.CHECKPOINT_DECLINED)); + } + + assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); + } + + @Test + public void testStopWithSavepointFailingWithExpiredCheckpoint() throws Exception { + // we allow restarts right from the start since the failure is going to happen in the first + // phase (savepoint creation) of stop-with-savepoint + testRestartBackoffTimeStrategy.setCanRestart(true); + + final JobGraph jobGraph = createTwoVertexJobGraph(); + // set checkpoint timeout to a low value to simulate checkpoint expiration + enableCheckpointing(jobGraph, 10); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // we have to set a listener that checks for the termination of the checkpoint handling + OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch(); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + checkpointAbortionWasTriggered.trigger()); + + // the failure handling has to happen in the same thread as the checkpoint coordination - + // that's why we have to instantiate a separate ThreadExecutorService here + final ScheduledExecutorService singleThreadExecutorService = + Executors.newSingleThreadScheduledExecutor(); + final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + singleThreadExecutorService); + + final DefaultScheduler scheduler = + CompletableFuture.supplyAsync( + () -> + createSchedulerAndStartScheduling( + jobGraph, mainThreadExecutor), + mainThreadExecutor) + .get(); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + final CompletableFuture<String> stopWithSavepointFuture = + CompletableFuture.supplyAsync( + () -> { + // we have to make sure that the tasks are running before + // stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + failingExecutionAttemptId, + ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.RUNNING)); + + return scheduler.stopWithSavepoint("savepoint-path", false); + }, + mainThreadExecutor) + .get(); + + checkpointTriggeredLatch.await(); + + final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + + final AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + + // we need to wait for the expired checkpoint to be handled + checkpointAbortionWasTriggered.await(); + + CompletableFuture.runAsync( + () -> { + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + failingExecutionAttemptId, + ExecutionState.FAILED)); + + // the restart due to failed checkpoint handling triggering a global job + // fail-over + assertThat( + taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1)); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + }, + mainThreadExecutor) + .get(); + + try { + stopWithSavepointFuture.get(); + fail("An exception is expected."); + } catch (ExecutionException e) { + Optional<CheckpointException> actualCheckpointException = + findThrowable(e, CheckpointException.class); + assertTrue(actualCheckpointException.isPresent()); + assertThat( + actualCheckpointException.get().getCheckpointFailureReason(), + is(CheckpointFailureReason.CHECKPOINT_EXPIRED)); + } + + // we have to wait for the main executor to be finished with restarting tasks + singleThreadExecutorService.shutdown(); + singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS); + + assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); + } + + @Test + public void testStopWithSavepoint() throws Exception { + // we don't allow any restarts during the happy path + testRestartBackoffTimeStrategy.setCanRestart(false); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + final List<ExecutionAttemptID> executionAttemptIdsWithCompletedCheckpoint = + new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointAborted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final Set<ExecutionAttemptID> executionAttemptIds = + StreamSupport.stream( + scheduler + .getExecutionGraph() + .getAllExecutionVertices() + .spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getAttemptId) + .collect(Collectors.toSet()); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + executionAttemptIds.forEach( + id -> + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), id, ExecutionState.RUNNING))); + + final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + // trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint(savepointFolder, false); + checkpointTriggeredLatch.await(); + + acknowledgePendingCheckpoint(scheduler, 1); + + assertThat( + "Both the executions where notified about the completed checkpoint.", + executionAttemptIdsWithCompletedCheckpoint, + containsInAnyOrder(executionAttemptIds.toArray())); + + executionAttemptIds.forEach( + id -> + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), id, ExecutionState.FINISHED))); + + try { + assertThat( + stopWithSavepointFuture.get(), + startsWith(String.format("file:%s", savepointFolder))); + } catch (ExecutionException e) { + fail("No exception is expected."); + } Review comment: Can't you just remove the try catch and throw the exception out of the test. Then, we would get the stack trace + message also in CI output. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } + @Test + public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { + // initially, we don't allow any restarts since the first phase (savepoint creation) + // succeeds without any failures + testRestartBackoffTimeStrategy.setCanRestart(false); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + final List<ExecutionAttemptID> executionAttemptIdsWithCompletedCheckpoint = + new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointAborted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + // trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint(savepointFolder, false); + checkpointTriggeredLatch.await(); + + acknowledgePendingCheckpoint(scheduler, 1); + + assertThat( + "Both the executions where notified about the completed checkpoint.", + executionAttemptIdsWithCompletedCheckpoint, + containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + + // The savepoint creation succeeded a failure happens in the second phase when finishing + // the tasks. That's why, the restarting policy is enabled. + testRestartBackoffTimeStrategy.setCanRestart(true); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + + // the restarts due to local failure handling and global job fail-over are triggered + assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + + try { + stopWithSavepointFuture.get(); + fail("An exception is expected."); + } catch (ExecutionException e) { + Optional<FlinkException> flinkException = + ExceptionUtils.findThrowable(e, FlinkException.class); + + assertTrue(flinkException.isPresent()); + assertThat( + flinkException.get().getMessage(), + is( + String.format( + "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", + jobGraph.getJobID()))); + } + + assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); + } + + @Test + public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exception { + // we allow restarts right from the start since the failure is going to happen in the first + // phase (savepoint creation) of stop-with-savepoint + testRestartBackoffTimeStrategy.setCanRestart(true); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + CountDownLatch checkpointAbortionConfirmedLatch = new CountDownLatch(2); + final List<ExecutionAttemptID> executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> { + executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId); + checkpointAbortionConfirmedLatch.countDown(); + }); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointCompleted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint("savepoint-path", false); + checkpointTriggeredLatch.await(); + + final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + + final AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + final DeclineCheckpoint declineCheckpoint = + new DeclineCheckpoint( + jobGraph.getJobID(), + failingExecutionAttemptId, + 1, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)); + + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, "unknown location"); + + // we need to wait for the confirmations to be collected since this is running in a separate + // thread + checkpointAbortionConfirmedLatch.await(); + + assertThat( + "Both of the executions where notified about the aborted checkpoint.", + executionAttemptIdsWithAbortedCheckpoint, + is(Arrays.asList(succeedingExecutionAttemptId, failingExecutionAttemptId))); Review comment: Do we need to enforce order here? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); + final CompletableFuture<String> savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - final CompletableFuture<JobStatus> terminationFuture = - executionGraph - .getTerminationFuture() - .handle( - (jobstatus, throwable) -> { - if (throwable != null) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: {}", - jobGraph.getJobID(), - throwable.getMessage()); - throw new CompletionException(throwable); - } else if (jobstatus != JobStatus.FINISHED) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", - jobGraph.getJobID(), - jobstatus); - throw new CompletionException( - new FlinkException( - "Reached state " - + jobstatus - + " instead of FINISHED.")); - } - return jobstatus; - }); - return savepointFuture - .thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) + .thenCompose( + path -> + executionGraphTerminationFuture + .handleAsync( Review comment: If we are going to change the behavior here, we also need to change the `FutureUtils.combineAll()` to a `waitForAll()`, to get all the final states. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -908,38 +909,57 @@ public void reportCheckpointMetrics( // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler(); + final CompletableFuture<Collection<ExecutionState>> executionGraphTerminationFuture = + FutureUtils.combineAll( + StreamSupport.stream( + executionGraph.getAllExecutionVertices().spliterator(), + false) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getTerminalStateFuture) + .collect(Collectors.toList())); + final CompletableFuture<String> savepointFuture = checkpointCoordinator .triggerSynchronousSavepoint(advanceToEndOfEventTime, targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - final CompletableFuture<JobStatus> terminationFuture = - executionGraph - .getTerminationFuture() - .handle( - (jobstatus, throwable) -> { - if (throwable != null) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: {}", - jobGraph.getJobID(), - throwable.getMessage()); - throw new CompletionException(throwable); - } else if (jobstatus != JobStatus.FINISHED) { - log.info( - "Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", - jobGraph.getJobID(), - jobstatus); - throw new CompletionException( - new FlinkException( - "Reached state " - + jobstatus - + " instead of FINISHED.")); - } - return jobstatus; - }); - return savepointFuture - .thenCompose((path) -> terminationFuture.thenApply((jobStatus -> path))) + .thenCompose( + path -> + executionGraphTerminationFuture + .handleAsync( + (executionStates, throwable) -> { + Set<ExecutionState> nonFinishedStates = + extractNonFinishedStates( + executionStates); + if (throwable != null) { Review comment: Under which circumstances are we running into this condition? (Background: We should end up in this condition if the termination future of the `Execution` is completed exceptionally. I couldn't spot where that's happening. But if there's ever going to be a code change that returns a throwable here, then we might now properly go into a global failover) ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } + @Test + public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { + // initially, we don't allow any restarts since the first phase (savepoint creation) + // succeeds without any failures + testRestartBackoffTimeStrategy.setCanRestart(false); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + final List<ExecutionAttemptID> executionAttemptIdsWithCompletedCheckpoint = + new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointAborted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + // trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint(savepointFolder, false); + checkpointTriggeredLatch.await(); + + acknowledgePendingCheckpoint(scheduler, 1); + + assertThat( + "Both the executions where notified about the completed checkpoint.", + executionAttemptIdsWithCompletedCheckpoint, + containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + + // The savepoint creation succeeded a failure happens in the second phase when finishing + // the tasks. That's why, the restarting policy is enabled. + testRestartBackoffTimeStrategy.setCanRestart(true); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + + // the restarts due to local failure handling and global job fail-over are triggered + assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + + try { + stopWithSavepointFuture.get(); + fail("An exception is expected."); + } catch (ExecutionException e) { + Optional<FlinkException> flinkException = + ExceptionUtils.findThrowable(e, FlinkException.class); + + assertTrue(flinkException.isPresent()); + assertThat( + flinkException.get().getMessage(), + is( + String.format( + "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", + jobGraph.getJobID()))); + } + + assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); + } + + @Test + public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exception { + // we allow restarts right from the start since the failure is going to happen in the first + // phase (savepoint creation) of stop-with-savepoint + testRestartBackoffTimeStrategy.setCanRestart(true); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + CountDownLatch checkpointAbortionConfirmedLatch = new CountDownLatch(2); + final List<ExecutionAttemptID> executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> { + executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId); + checkpointAbortionConfirmedLatch.countDown(); + }); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointCompleted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint("savepoint-path", false); + checkpointTriggeredLatch.await(); + + final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + + final AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + final DeclineCheckpoint declineCheckpoint = + new DeclineCheckpoint( + jobGraph.getJobID(), + failingExecutionAttemptId, + 1, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)); + + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, "unknown location"); + + // we need to wait for the confirmations to be collected since this is running in a separate + // thread + checkpointAbortionConfirmedLatch.await(); + + assertThat( + "Both of the executions where notified about the aborted checkpoint.", + executionAttemptIdsWithAbortedCheckpoint, + is(Arrays.asList(succeedingExecutionAttemptId, failingExecutionAttemptId))); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); + + // the restart due to failed checkpoint handling triggering a global job fail-over + assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1)); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + + try { + stopWithSavepointFuture.get(); + fail("An exception is expected."); + } catch (ExecutionException e) { + Optional<CheckpointException> actualCheckpointException = + findThrowable(e, CheckpointException.class); + assertTrue(actualCheckpointException.isPresent()); + assertThat( + actualCheckpointException.get().getCheckpointFailureReason(), + is(CheckpointFailureReason.CHECKPOINT_DECLINED)); + } + + assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); + } + + @Test + public void testStopWithSavepointFailingWithExpiredCheckpoint() throws Exception { + // we allow restarts right from the start since the failure is going to happen in the first + // phase (savepoint creation) of stop-with-savepoint + testRestartBackoffTimeStrategy.setCanRestart(true); + + final JobGraph jobGraph = createTwoVertexJobGraph(); + // set checkpoint timeout to a low value to simulate checkpoint expiration + enableCheckpointing(jobGraph, 10); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // we have to set a listener that checks for the termination of the checkpoint handling + OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch(); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + checkpointAbortionWasTriggered.trigger()); + + // the failure handling has to happen in the same thread as the checkpoint coordination - + // that's why we have to instantiate a separate ThreadExecutorService here + final ScheduledExecutorService singleThreadExecutorService = Review comment: Why can't we use `ComponentMainThreadExecutorServiceAdapter.forMainThread()`? The proper solution would be passing checkpoint coordinator or its timer thread from the test into it. But that introducing this change seems beyond the scope of this PR. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } + @Test + public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { + // initially, we don't allow any restarts since the first phase (savepoint creation) + // succeeds without any failures + testRestartBackoffTimeStrategy.setCanRestart(false); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + final List<ExecutionAttemptID> executionAttemptIdsWithCompletedCheckpoint = + new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointAborted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + // trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint(savepointFolder, false); + checkpointTriggeredLatch.await(); + + acknowledgePendingCheckpoint(scheduler, 1); + + assertThat( + "Both the executions where notified about the completed checkpoint.", + executionAttemptIdsWithCompletedCheckpoint, + containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + + // The savepoint creation succeeded a failure happens in the second phase when finishing + // the tasks. That's why, the restarting policy is enabled. + testRestartBackoffTimeStrategy.setCanRestart(true); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + + // the restarts due to local failure handling and global job fail-over are triggered + assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); Review comment: This feels like the test depends on some implementation details of the test's subject. It is likely that somebody who's changing the scheduler, and subsequently runs into a test failure here will just adjust this number to make the test pass again. Can you comment which two scheduled tasks are expected to be in there? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org