becketqin commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r471240700



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                        
request.getOnCompletionFuture()),
                                                timer);
 
-                       final CompletableFuture<?> masterStatesComplete = 
pendingCheckpointCompletableFuture
-                                       .thenCompose(this::snapshotMasterState);
-
                        final CompletableFuture<?> 
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
                                        .thenComposeAsync((pendingCheckpoint) ->
                                                        
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
+                       // We have to take the snapshot of the master hooks 
after the coordinator checkpoints has completed.
+                       // This is to ensure the tasks are checkpointed after 
the OperatorCoordinators in case
+                       // ExternallyInducedSource is used.
+                       final CompletableFuture<?> masterStatesComplete = 
coordinatorCheckpointsComplete
+                                       .thenComposeAsync(ignored -> {
+                                               PendingCheckpoint checkpoint =
+                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
       Hmm, I am not sure. `thenComposeAsync()` in only invoked if the previous 
stage completes normally. In our case, the stages are 
`pendingCheckpointCompletableFuture` -> `coordinatorCheckpointsComplete` -> 
`masterStatesComplete`. So when the code reaches here, 
`pendingCheckpointCompletableFuture` must have completed normally. The 
`checkState()` would never fail.
   
   So basically the rule of thumb is that if something is done in 
`thenCompose`, `thenApply`, etc, there is no need to check the error of the 
previous stage. The exception will be thrown when `future.get()` is invoked on 
any of the subsequent stage of a failed stage.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */

Review comment:
       I end up with testing in one case for simplicity. From the overall 
perspective, the test checks that the checkpoint is executed in expected order.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -576,7 +583,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                                                
request.advanceToEndOfTime);
 
                                                                        
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
-
+                                                                       // It 
is possible that the tasks has finished checkpointing at this point.
+                                                                       // So 
we need to complete this pending checkpoint.

Review comment:
       Correct. The synchronization can be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to