XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r575246034



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        CountDownLatch checkpointAbortionConfirmedLatch = new 
CountDownLatch(2);
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+                    
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+                    checkpointAbortionConfirmedLatch.countDown();
+                });
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointCompleted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint("savepoint-path", false);
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+        final DeclineCheckpoint declineCheckpoint =
+                new DeclineCheckpoint(
+                        jobGraph.getJobID(),
+                        failingExecutionAttemptId,
+                        1,
+                        new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+        checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, 
"unknown location");
+
+        // we need to wait for the confirmations to be collected since this is 
running in a separate
+        // thread
+        checkpointAbortionConfirmedLatch.await();
+
+        assertThat(
+                "Both of the executions where notified about the aborted 
checkpoint.",
+                executionAttemptIdsWithAbortedCheckpoint,
+                is(Arrays.asList(succeedingExecutionAttemptId, 
failingExecutionAttemptId)));
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+
+        // the restart due to failed checkpoint handling triggering a global 
job fail-over
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(1));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =

Review comment:
       Just for the record: We have to use a dedicated 
`ScheduledExecutorService` since the failure handling is triggered by the 
[CheckpointCanceler through the abortion 
mechanism](https://github.com/XComp/flink/blob/35757ed3c11f32769d6d99882596f78f8b8e12b5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1982)
 which runs in the `CheckpointCoordinator`'s [timer 
Executor](https://github.com/XComp/flink/blob/35757ed3c11f32769d6d99882596f78f8b8e12b5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L726).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to