XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576628701
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -817,6 +818,126 @@ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); } + @Test + public void testStopWithSavepointFailingWithExpiredCheckpoint() throws Exception { + // we allow restarts right from the start since the failure is going to happen in the first + // phase (savepoint creation) of stop-with-savepoint + testRestartBackoffTimeStrategy.setCanRestart(true); + + final JobGraph jobGraph = createTwoVertexJobGraph(); + // set checkpoint timeout to a low value to simulate checkpoint expiration + enableCheckpointing(jobGraph, 10); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // we have to set a listener that checks for the termination of the checkpoint handling + OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch(); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + checkpointAbortionWasTriggered.trigger()); + + // the failure handling has to happen in the same thread as the checkpoint coordination - + // that's why we have to instantiate a separate ThreadExecutorService here + final ScheduledExecutorService singleThreadExecutorService = + Executors.newSingleThreadScheduledExecutor(); + final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + singleThreadExecutorService); + + final DefaultScheduler scheduler = + CompletableFuture.supplyAsync( + () -> + createSchedulerAndStartScheduling( + jobGraph, mainThreadExecutor), + mainThreadExecutor) + .get(); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + final CompletableFuture<String> stopWithSavepointFuture = + CompletableFuture.supplyAsync( + () -> { + // we have to make sure that the tasks are running before + // stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + failingExecutionAttemptId, + ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.RUNNING)); + + return scheduler.stopWithSavepoint("savepoint-path", false); + }, + mainThreadExecutor) + .get(); + + checkpointTriggeredLatch.await(); + + final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + + final AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + + // we need to wait for the expired checkpoint to be handled + checkpointAbortionWasTriggered.await(); + + CompletableFuture.runAsync( + () -> { + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + failingExecutionAttemptId, + ExecutionState.FAILED)); + + // the restart due to failed checkpoint handling triggering a global job + // fail-over + assertThat( + taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1)); Review comment: The aborted Checkpoint will also trigger a [job failover in case of the synchronous checkpoint](https://github.com/XComp/flink/blob/35757ed3c11f32769d6d99882596f78f8b8e12b5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1921). Looking at the code again I realize that declining a checkpoint due to a failure while taking a snapshot of the state and aborting the Checkpoint due to checkpoint expiration (e.g. the TaskManager not being responsive) trigger the same error handling. That brings up the question whether one test would be enough here since we're testing the same `DefaultScheduler` functionality here. It feels like we should rather add a test to `CheckpointCoordinatorTest` to verify that an expired Checkpoint leads to the job fail-over as well. Right now, only the error handling for a declined Checkpoint is test in [CheckpointCoordinatorTest.jobFailsIfInFlightSynchronousSavepointIsDiscarded](https://github.com/XComp/flink/blob/3d400072f28ac4ff37bd38ff077bd95d57302658/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java#L2751). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org