rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r894476700


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -50,6 +56,9 @@ public class CheckpointsCleaner implements Serializable, 
AutoCloseableAsync {
     @Nullable
     private CompletableFuture<Void> cleanUpFuture;
 
+    /** All subsumed checkpoints. */
+    private final Map<Long, CompletedCheckpoint> subsumedCheckpoints = new 
HashMap<>();

Review Comment:
   Looks like this field can be a `List` now?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) 
{
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), 
completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, 
Executor executor) {

Review Comment:
   Could you please add javadoc to this method?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,12 +59,13 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, 
long checkpointID);
+
     /**
      * Unregister state that is not referenced by the given checkpoint ID or 
any newer.
      *
      * @param lowestCheckpointID which is still valid
      */
-    void unregisterUnusedState(long lowestCheckpointID);
+    Set<Long> unregisterUnusedState(long lowestCheckpointID);

Review Comment:
   Could you please update javadoc and describe what does this method returns 
now?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void 
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws 
Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 
100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), 
miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 
100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);
+        miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+        miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+    }
+
+    @Test
+    public void testCheckpointFolderDeletion() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
+
+        // cancel after next materialization
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 
100, 100);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        Thread.sleep(1000);

Review Comment:
   Could you please explain why this `sleep` is needed?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void 
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws 
Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);

Review Comment:
   This test relies on materialization, but the interval is quite big and might 
prevent it from happening - should it be decreased?
   
   And ideally, we should verify the presense of the materialized part - 
similar to `ChangelogPeriodicMaterializationITCase`.



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void 
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws 
Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 
100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), 
miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 
100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);

Review Comment:
   Could you please explain why this `sleep` is needed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) 
{
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), 
completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, 
Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = 
subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) 
{
+                    try {
+                        LOG.debug("Try to discard checkpoint {}.", 
checkpoint.getCheckpointID());
+                        cleanCheckpoint(
+                                checkpoint,
+                                checkpoint.shouldBeDiscardedOnSubsume(),
+                                postCleanAction,
+                                executor);
+                        iterator.remove();
+                    } catch (Exception e) {
+                        LOG.warn("Fail to discard the old checkpoint {}.", 
checkpoint);
+                    }
+                }
+            }
+        }
+    }
+
+    @VisibleForTesting
+    void cleanSubsumedCheckpointsWithException(
+            long upTo,
+            Set<Long> stillInUse,
+            Runnable postCleanAction,
+            Executor executor,
+            DiscardException discardException) {

Review Comment:
   IIUC, this method is only used in test; and that test verifies this method 
behavior. This doesn't make much sense to me, maybe we can just remove both?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void 
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws 
Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 
100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), 
miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 
100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);
+        miniCluster.submitJob(thirdJobGraph).get();
+        waitForAllTaskRunning(miniCluster, thirdJobGraph.getJobID(), true);
+        Thread.sleep(500);
+        miniCluster.triggerCheckpoint(thirdJobGraph.getJobID()).get();
+        miniCluster.cancelJob(thirdJobGraph.getJobID()).get();
+    }
+
+    @Test
+    public void testCheckpointFolderDeletion() throws Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
+
+        // cancel after next materialization
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 
100, 100);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        Thread.sleep(1000);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        assertFalse(checkpointFolderExists(firstRestorePath.substring(5)));

Review Comment:
   1. Ideally, folder deletion should be tested also in case of checkpoint 
subsumption.
   2. Is it guaranteed that the folder is cleaned up by the time 
`cancelJob.get` returns? If not, the assertion might be flaky



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.java:
##########
@@ -50,6 +86,78 @@ public void 
testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception
         testSwitchEnv(getEnv(true, NUM_SLOTS), getEnv(false, NUM_SLOTS / 2));
     }
 
+    @Test
+    public void testSwitchFromDisablingToEnablingInClaimMode() throws 
Exception {
+        File firstCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, firstCheckpointFolder, false, 
100, 600000);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.cancelJob(firstJobGraph.getJobID()).get();
+        String firstRestorePath =
+                getLatestCompletedCheckpointPath(firstJobGraph.getJobID(), 
miniCluster).get();
+
+        // 1st restore, switch from disable to enable
+        File secondCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env2 =
+                getEnv(delegatedStateBackend, secondCheckpointFolder, true, 
100, 60000);
+        JobGraph secondJobGraph = buildJobGraph(env2);
+        setSavepointRestoreSettings(secondJobGraph, firstRestorePath);
+
+        miniCluster.submitJob(secondJobGraph).get();
+        waitForAllTaskRunning(miniCluster, secondJobGraph.getJobID(), true);
+        miniCluster.triggerCheckpoint(secondJobGraph.getJobID()).get();
+        miniCluster.cancelJob(secondJobGraph.getJobID()).get();
+        String secondRestorePath =
+                getLatestCompletedCheckpointPath(secondJobGraph.getJobID(), 
miniCluster).get();
+
+        // 2nd restore, private state of first restore checkpoint still exist.
+        File thirdCheckpointFolder = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env3 =
+                getEnv(delegatedStateBackend, thirdCheckpointFolder, true, 
100, 100);
+        JobGraph thirdJobGraph = buildJobGraph(env3);
+        setSavepointRestoreSettings(thirdJobGraph, secondRestorePath);

Review Comment:
   There's no guarantee that private state is still needed - it could be that 
the delegated state backend materialized with completely new private state, 
right? (just asking)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +74,66 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) 
{
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), 
completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, 
Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = 
subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) 
{
+                    try {
+                        LOG.debug("Try to discard checkpoint {}.", 
checkpoint.getCheckpointID());
+                        cleanCheckpoint(

Review Comment:
   I am concerned about `synchronized (subsumedCheckpoints)`:
   `cleanCheckpoint` eventually calls `incrementNumberOfCheckpointsToClean` 
which is synchronized on `this`. Synchronizing on a single object would make 
reasoning easier and less error-prone, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to