XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r578567121
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -817,6 +818,126 @@ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); } + @Test + public void testStopWithSavepointFailingWithExpiredCheckpoint() throws Exception { + // we allow restarts right from the start since the failure is going to happen in the first + // phase (savepoint creation) of stop-with-savepoint + testRestartBackoffTimeStrategy.setCanRestart(true); + + final JobGraph jobGraph = createTwoVertexJobGraph(); + // set checkpoint timeout to a low value to simulate checkpoint expiration + enableCheckpointing(jobGraph, 10); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // we have to set a listener that checks for the termination of the checkpoint handling + OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch(); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + checkpointAbortionWasTriggered.trigger()); + + // the failure handling has to happen in the same thread as the checkpoint coordination - + // that's why we have to instantiate a separate ThreadExecutorService here + final ScheduledExecutorService singleThreadExecutorService = + Executors.newSingleThreadScheduledExecutor(); + final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + singleThreadExecutorService); + + final DefaultScheduler scheduler = + CompletableFuture.supplyAsync( + () -> + createSchedulerAndStartScheduling( + jobGraph, mainThreadExecutor), + mainThreadExecutor) + .get(); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + final CompletableFuture<String> stopWithSavepointFuture = + CompletableFuture.supplyAsync( + () -> { + // we have to make sure that the tasks are running before + // stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + failingExecutionAttemptId, + ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.RUNNING)); + + return scheduler.stopWithSavepoint("savepoint-path", false); + }, + mainThreadExecutor) + .get(); + + checkpointTriggeredLatch.await(); + + final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + + final AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + + // we need to wait for the expired checkpoint to be handled + checkpointAbortionWasTriggered.await(); + + CompletableFuture.runAsync( + () -> { + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); Review comment: Thanks for pointing that out. The tests got refactored as part of the most recent changes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org