XComp commented on a change in pull request #14847: URL: https://github.com/apache/flink/pull/14847#discussion_r576615862
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ########## @@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), is(equalTo(0))); } + @Test + public void testStopWithSavepointFailingAfterSavepointCreation() throws Exception { + // initially, we don't allow any restarts since the first phase (savepoint creation) + // succeeds without any failures + testRestartBackoffTimeStrategy.setCanRestart(false); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + final List<ExecutionAttemptID> executionAttemptIdsWithCompletedCheckpoint = + new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> + executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId)); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointAborted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final String savepointFolder = TEMPORARY_FOLDER.newFolder().getAbsolutePath(); + + // trigger savepoint and wait for checkpoint to be retrieved by TaskManagerGateway + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint(savepointFolder, false); + checkpointTriggeredLatch.await(); + + acknowledgePendingCheckpoint(scheduler, 1); + + assertThat( + "Both the executions where notified about the completed checkpoint.", + executionAttemptIdsWithCompletedCheckpoint, + containsInAnyOrder(failingExecutionAttemptId, succeedingExecutionAttemptId)); + + // The savepoint creation succeeded a failure happens in the second phase when finishing + // the tasks. That's why, the restarting policy is enabled. + testRestartBackoffTimeStrategy.setCanRestart(true); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.FAILED)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); + + // the restarts due to local failure handling and global job fail-over are triggered + assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(2)); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); + + try { + stopWithSavepointFuture.get(); + fail("An exception is expected."); + } catch (ExecutionException e) { + Optional<FlinkException> flinkException = + ExceptionUtils.findThrowable(e, FlinkException.class); + + assertTrue(flinkException.isPresent()); + assertThat( + flinkException.get().getMessage(), + is( + String.format( + "Inconsistent execution state after stopping with savepoint. A global fail-over was triggered to recover the job %s.", + jobGraph.getJobID()))); + } + + assertThat(scheduler.getExecutionGraph().getState(), is(JobStatus.RUNNING)); + } + + @Test + public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exception { + // we allow restarts right from the start since the failure is going to happen in the first + // phase (savepoint creation) of stop-with-savepoint + testRestartBackoffTimeStrategy.setCanRestart(true); + + final JobGraph jobGraph = createTwoVertexJobGraphWithCheckpointingEnabled(); + + final SimpleAckingTaskManagerGateway taskManagerGateway = + new SimpleAckingTaskManagerGateway(); + final CountDownLatch checkpointTriggeredLatch = + getCheckpointTriggeredLatch(taskManagerGateway); + + // collect executions to which the checkpoint completion was confirmed + CountDownLatch checkpointAbortionConfirmedLatch = new CountDownLatch(2); + final List<ExecutionAttemptID> executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>(); + taskManagerGateway.setNotifyCheckpointAbortedConsumer( + (executionAttemptId, jobId, actualCheckpointId, timestamp) -> { + executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId); + checkpointAbortionConfirmedLatch.countDown(); + }); + taskManagerGateway.setNotifyCheckpointCompleteConsumer( + (ignored0, ignored1, ignored2, ignored3) -> { + throw new UnsupportedOperationException("notifyCheckpointCompleted was called"); + }); + + final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + + final ExecutionAttemptID succeedingExecutionAttemptId = + Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0) + .getCurrentExecutionAttempt() + .getAttemptId(); + final ExecutionAttemptID failingExecutionAttemptId = + Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices()) + .getCurrentExecutionAttempt() + .getAttemptId(); + + // we have to make sure that the tasks are running before stop-with-savepoint is triggered + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), failingExecutionAttemptId, ExecutionState.RUNNING)); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), succeedingExecutionAttemptId, ExecutionState.RUNNING)); + + final CompletableFuture<String> stopWithSavepointFuture = + scheduler.stopWithSavepoint("savepoint-path", false); + checkpointTriggeredLatch.await(); + + final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); + + final AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint(jobGraph.getJobID(), succeedingExecutionAttemptId, 1); + final DeclineCheckpoint declineCheckpoint = + new DeclineCheckpoint( + jobGraph.getJobID(), + failingExecutionAttemptId, + 1, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)); + + checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "unknown location"); + checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, "unknown location"); + + // we need to wait for the confirmations to be collected since this running in a separate + // thread + checkpointAbortionConfirmedLatch.await(); + + assertThat( + "Both of the executions where notified about the aborted checkpoint.", + executionAttemptIdsWithAbortedCheckpoint, + is(Arrays.asList(succeedingExecutionAttemptId, failingExecutionAttemptId))); + + scheduler.updateTaskExecutionState( + new TaskExecutionState( + jobGraph.getJobID(), + succeedingExecutionAttemptId, + ExecutionState.FINISHED)); Review comment: That is a mistake on my end - it proves that I didn't fully anticipate how the `DefaultSchedulerTest` needs to be implemented. Please correct me if I'm wrong: The tasks need to be cancelled, instead, since the cancellation call is send by the `TaskManager`. That will trigger the restart of the tasks which was initiated by the `CheckpointCoordinator`. I came up with this code version (where the state changes to `FINISHED`/`FAILED`) due to the initial use-case of FLINK-21030. `FINISHED`/`FAILED` made the test to succeed as well because both are terminal states like `CANCEL`. Any terminal state triggers the restart of the tasks because the restart precedes the completion of the related `Execution`'s [releaseFuture](https://github.com/XComp/flink/blob/7ea786ac6af7286eb147449729316418b38cdafe/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L378). I fixed it by transitioning to `CANCEL` state now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org