XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576628701



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
         assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
     }
 
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =
+                Executors.newSingleThreadScheduledExecutor();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+
+        final DefaultScheduler scheduler =
+                CompletableFuture.supplyAsync(
+                                () ->
+                                        createSchedulerAndStartScheduling(
+                                                jobGraph, mainThreadExecutor),
+                                mainThreadExecutor)
+                        .get();
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                CompletableFuture.supplyAsync(
+                                () -> {
+                                    // we have to make sure that the tasks are 
running before
+                                    // stop-with-savepoint is triggered
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    failingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    
succeedingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+
+                                    return 
scheduler.stopWithSavepoint("savepoint-path", false);
+                                },
+                                mainThreadExecutor)
+                        .get();
+
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+        // we need to wait for the expired checkpoint to be handled
+        checkpointAbortionWasTriggered.await();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            succeedingExecutionAttemptId,
+                                            ExecutionState.FINISHED));
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            failingExecutionAttemptId,
+                                            ExecutionState.FAILED));
+
+                            // the restart due to failed checkpoint handling 
triggering a global job
+                            // fail-over
+                            assertThat(
+                                    
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));

Review comment:
       The aborted Checkpoint will also trigger a [job failover in case of the 
synchronous 
checkpoint](https://github.com/XComp/flink/blob/35757ed3c11f32769d6d99882596f78f8b8e12b5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1921).
 
   
   Looking at the code again I realize that declining a checkpoint due to a 
failure while taking a snapshot of the state and aborting the Checkpoint due to 
checkpoint expiration (e.g. the TaskManager not being responsive) trigger the 
same error handling. That brings up the question whether one test would be 
enough here since we're testing the same `DefaultScheduler` functionality here. 
It feels like we should rather add a test to `CheckpointCoordinatorTest` to 
verify that an expired Checkpoint leads to the job fail-over as well. Right 
now, only the error handling for a declined Checkpoint is test in 
[CheckpointCoordinatorTest.jobFailsIfInFlightSynchronousSavepointIsDiscarded](https://github.com/XComp/flink/blob/3d400072f28ac4ff37bd38ff077bd95d57302658/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java#L2751).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to