zhuzhurk commented on code in PR #21943:
URL: https://github.com/apache/flink/pull/21943#discussion_r1111563249


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1373,6 +1371,7 @@ private void cleanupAfterCompletedCheckpoint(
                     completedCheckpoint.getTimestamp(),
                     extractIdIfDiscardedOnSubsumed(lastSubsumed));
         }
+        pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);

Review Comment:
   I prefer to do this right in ahead of the invocation of 
`reportCompletedCheckpoint(...)`. So that we report completed checkpoints only 
after it completes.
   Do you have any concern for it? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java:
##########
@@ -107,7 +107,11 @@ public enum TaskAcknowledgeResult {
     /** The checkpoint properties. */
     private final CheckpointProperties props;
 
-    /** The promise to fulfill once the checkpoint has been completed. */
+    /**
+     * Because onCompletionPromise is not required to synchronize with the 
completion status of
+     * pendingCheckpoint. The promise to fulfill once the checkpoint has been 
completed and is added
+     * to the store.
+     */

Review Comment:
   -> The promise to fulfill once the checkpoint has been completed. Note that 
it will be completed only after the checkpoint is successfully added to 
CompletedCheckpointStore.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -832,6 +837,79 @@ void restoreStateWhenRestartingTasks() throws Exception {
         assertThat(masterHook.getRestoreCount()).isOne();
     }
 
+    @Test
+    void testTriggerCheckpointAndCompletedAfterStore() throws Exception {
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        enableCheckpointing(jobGraph);
+
+        final CountDownLatch checkpointTriggeredLatch = 
getCheckpointTriggeredLatch();
+
+        CompletedCheckpointStore store =
+                TestingCompletedCheckpointStore.builder()
+                        .withGetAllCheckpointsSupplier(Collections::emptyList)
+                        .withAddCheckpointAndSubsumeOldestOneFunction(
+                                (ignoredCompletedCheckpoint,
+                                        ignoredCheckpointsCleaner,
+                                        ignoredPostCleanup) -> {
+                                    throw new RuntimeException(
+                                            "Throw exception when add 
checkpoint to store.");
+                                })
+                        
.withGetSharedStateRegistrySupplier(SharedStateRegistryImpl::new)
+                        .build();
+
+        ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        new DirectScheduledExecutorService());
+        final DefaultScheduler scheduler;
+        try {
+            scheduler =
+                    createSchedulerBuilder(jobGraph, mainThreadExecutor)
+                            .setCheckpointRecoveryFactory(
+                                    new TestingCheckpointRecoveryFactory(
+                                            store, new 
StandaloneCheckpointIDCounter()))
+                            .build();
+            mainThreadExecutor.execute(scheduler::startScheduling);
+        } catch (Exception e) {
+            throw new RuntimeException(e);

Review Comment:
   This try-catch is not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to