pnowojski commented on a change in pull request #13044: URL: https://github.com/apache/flink/pull/13044#discussion_r467068535
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -576,7 +583,22 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) { request.advanceToEndOfTime); coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId)); - + // It is possible that the tasks has finished checkpointing at this point. + // So we need to complete this pending checkpoint. Review comment: Ok, but at least `synchronized (lock) {` wouldn't be needed anymore, right? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ########## @@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() throws Exception { } } + /** + * Test that the checkpoint still behave correctly when the task checkpoint is triggered by the + * master hooks and finished before the master checkpoint. + */ Review comment: > The other case is sort of indirectly tested by other tests which ensures all the OperatorCoordinators have finished snapshots before moving on to the next step. But as you haven't modified those tests, and the master build was green, it seems like those tests were passing with the incorrect code. Doesn't it mean we are still missing test coverage? If so, that would mean splitting this change into to two separate bug fixes, with two separate test fixes would be much easier to understand and follow, which unit test is covering for what bug. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -530,15 +530,22 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) { request.getOnCompletionFuture()), timer); - final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture - .thenCompose(this::snapshotMasterState); - final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture .thenComposeAsync((pendingCheckpoint) -> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( coordinatorsToCheckpoint, pendingCheckpoint, timer), timer); + // We have to take the snapshot of the master hooks after the coordinator checkpoints has completed. + // This is to ensure the tasks are checkpointed after the OperatorCoordinators in case + // ExternallyInducedSource is used. + final CompletableFuture<?> masterStatesComplete = coordinatorCheckpointsComplete + .thenComposeAsync(ignored -> { + PendingCheckpoint checkpoint = + FutureUtils.getWithoutException(pendingCheckpointCompletableFuture); Review comment: Ok, I see how it's supposed to be working. But in that case there is a missing `checkState` here. ``` PendingCheckpoint checkpoint = FutureUtils.getWithoutException(pendingCheckpointCompletableFuture); Preconditions.checkState( checkpoint != null || throwable != null, "?????????."); ``` % maybe code deduplication with the same get and check from L553:L558. It's better to fail early, with `IllegalStateException` compared to `NPE` in `snapshotMasterState`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -530,15 +530,22 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) { request.getOnCompletionFuture()), timer); - final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture - .thenCompose(this::snapshotMasterState); - final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture .thenComposeAsync((pendingCheckpoint) -> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion( coordinatorsToCheckpoint, pendingCheckpoint, timer), timer); + // We have to take the snapshot of the master hooks after the coordinator checkpoints has completed. + // This is to ensure the tasks are checkpointed after the OperatorCoordinators in case + // ExternallyInducedSource is used. + final CompletableFuture<?> masterStatesComplete = coordinatorCheckpointsComplete + .thenComposeAsync(ignored -> { Review comment: nit: in that case for the future, I wouldn't mix this change (refactor) with the bug fix, but do this in a separate commit, since for the same reasons: > he code that completes coordinatorCheckpointsComplete needs a few clicks to see it is done in the timer as well. it's not that easy to figure out that's indeed just a pure refactor. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org