zhuzhurk commented on code in PR #21943:
URL: https://github.com/apache/flink/pull/21943#discussion_r1111563249
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1373,6 +1371,7 @@ private void cleanupAfterCompletedCheckpoint(
completedCheckpoint.getTimestamp(),
extractIdIfDiscardedOnSubsumed(lastSubsumed));
}
+ pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
Review Comment:
I prefer to do this right in ahead of the invocation of
`reportCompletedCheckpoint(...)`. So that we report completed checkpoints only
after it completes.
Do you have any concern for it?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java:
##########
@@ -107,7 +107,11 @@ public enum TaskAcknowledgeResult {
/** The checkpoint properties. */
private final CheckpointProperties props;
- /** The promise to fulfill once the checkpoint has been completed. */
+ /**
+ * Because onCompletionPromise is not required to synchronize with the
completion status of
+ * pendingCheckpoint. The promise to fulfill once the checkpoint has been
completed and is added
+ * to the store.
+ */
Review Comment:
-> The promise to fulfill once the checkpoint has been completed. Note that
it will be completed only after the checkpoint is successfully added to
CompletedCheckpointStore.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java:
##########
@@ -832,6 +837,79 @@ void restoreStateWhenRestartingTasks() throws Exception {
assertThat(masterHook.getRestoreCount()).isOne();
}
+ @Test
+ void testTriggerCheckpointAndCompletedAfterStore() throws Exception {
+ final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+ enableCheckpointing(jobGraph);
+
+ final CountDownLatch checkpointTriggeredLatch =
getCheckpointTriggeredLatch();
+
+ CompletedCheckpointStore store =
+ TestingCompletedCheckpointStore.builder()
+ .withGetAllCheckpointsSupplier(Collections::emptyList)
+ .withAddCheckpointAndSubsumeOldestOneFunction(
+ (ignoredCompletedCheckpoint,
+ ignoredCheckpointsCleaner,
+ ignoredPostCleanup) -> {
+ throw new RuntimeException(
+ "Throw exception when add
checkpoint to store.");
+ })
+
.withGetSharedStateRegistrySupplier(SharedStateRegistryImpl::new)
+ .build();
+
+ ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ new DirectScheduledExecutorService());
+ final DefaultScheduler scheduler;
+ try {
+ scheduler =
+ createSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setCheckpointRecoveryFactory(
+ new TestingCheckpointRecoveryFactory(
+ store, new
StandaloneCheckpointIDCounter()))
+ .build();
+ mainThreadExecutor.execute(scheduler::startScheduling);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
Review Comment:
This try-catch is not needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]