rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r881739752


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -50,6 +54,9 @@ public class CheckpointsCleaner implements Serializable, 
AutoCloseableAsync {
     @Nullable
     private CompletableFuture<Void> cleanUpFuture;
 
+    /** All subsumed checkpoints. */
+    private Map<Long, CompletedCheckpoint> subsumedCheckpoints = new 
HashMap<>();

Review Comment:
   IDE suggests that this field can be final.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,13 +63,30 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, 
long checkpointID);
+
     /**
      * Unregister state that is not referenced by the given checkpoint ID or 
any newer.
      *
      * @param lowestCheckpointID which is still valid
      */
+    @VisibleForTesting
     void unregisterUnusedState(long lowestCheckpointID);

Review Comment:
   The method is still used by production code, so the annotation is 
unnecessary.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +165,40 @@ public void unregisterUnusedState(long 
lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    
markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        // only used for some tests
+        unregisterState(lowestCheckpointID, (x) -> {});
+    }
+
+    @Override
+    public void unregisterUnusedStateAndCheckpoint(
+            long lowestCheckpointID,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanUp,
+            @Nullable Executor executor) {
+        Set<Long> checkpointInUse = new HashSet<>();
+        unregisterState(lowestCheckpointID, id -> checkpointInUse.add(id));
+        if (executor != null) {
+            checkpointsCleaner.cleanSubsumedCheckpoints(
+                    lowestCheckpointID, checkpointInUse, postCleanUp, 
executor);
+        } else {
+            checkpointsCleaner.cleanSubsumedCheckpoints(
+                    lowestCheckpointID, checkpointInUse, postCleanUp, 
asyncDisposalExecutor);

Review Comment:
   WDYT about returning checkpointsInUse from this function 
(`unregisterUnusedState`) and calling 
`checkpointsCleaner.cleanSubsumedCheckpoints` outside (i.e. in 
`CompletedCheckpointStore`)?
   
   That way, `SharedStateRegistry` is kept  unaware of checkpoints cleanup.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -107,6 +144,7 @@ private void maybeCompleteCloseUnsafe() {
         if (numberOfCheckpointsToClean == 0 && cleanUpFuture != null) {
             cleanUpFuture.complete(null);
         }
+        subsumedCheckpoints.clear();

Review Comment:
   Shouldn't this statement be placed under `if`? 
   Otherwise, it will clear `subsumedCheckpoints` after every checkpoint 
cleanup?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +72,36 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) 
{
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), 
completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, 
Executor executor) {

Review Comment:
   I think it makes sense to cover this logic by some unit tests.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +165,40 @@ public void unregisterUnusedState(long 
lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    
markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        // only used for some tests
+        unregisterState(lowestCheckpointID, (x) -> {});
+    }
+
+    @Override
+    public void unregisterUnusedStateAndCheckpoint(
+            long lowestCheckpointID,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanUp,
+            @Nullable Executor executor) {
+        Set<Long> checkpointInUse = new HashSet<>();
+        unregisterState(lowestCheckpointID, id -> checkpointInUse.add(id));

Review Comment:
   nit: `unregisterState(lowestCheckpointID, checkpointInUse::add);`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +72,36 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) 
{
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), 
completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, 
Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = 
subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) 
{
+                    iterator.remove();
+                    LOG.trace("Discard checkpoint {}.", 
checkpoint.getCheckpointID());

Review Comment:
   `trace` -> `debug`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java:
##########
@@ -137,22 +137,21 @@ public CompletedCheckpoint 
addCheckpointAndSubsumeOldestOne(
 
         completedCheckpoints.addLast(checkpoint);
 
+        // Remove completed checkpoint from queue and 
checkpointStateHandleStore, not discard.
         Optional<CompletedCheckpoint> subsume =
                 CheckpointSubsumeHelper.subsume(
                         completedCheckpoints,
                         maxNumberOfCheckpointsToRetain,
-                        completedCheckpoint ->
-                                tryRemoveCompletedCheckpoint(
-                                        completedCheckpoint,
-                                        
completedCheckpoint.shouldBeDiscardedOnSubsume(),
-                                        checkpointsCleaner,
-                                        postCleanup));
-        unregisterUnusedState(completedCheckpoints);
-
-        if (subsume.isPresent()) {
-            LOG.debug("Added {} to {} without any older checkpoint to 
subsume.", checkpoint, path);
-        } else {
-            LOG.debug("Added {} to {} and subsume {}.", checkpoint, path, 
subsume);
+                        completedCheckpoint -> {
+                            tryRemove(completedCheckpoint.getCheckpointID());
+                            
checkpointsCleaner.addSubsumedCheckpoint(completedCheckpoint);
+                        });
+
+        Optional<Long> lowestCheckpointId = findLowest(completedCheckpoints);
+        if (lowestCheckpointId.isPresent()) {

Review Comment:
   nit: `isPresent` -> `ifPresent`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java:
##########
@@ -65,6 +72,36 @@ public void cleanCheckpoint(
         cleanup(checkpoint, discardObject::discard, postCleanAction, executor);
     }
 
+    public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) 
{
+        synchronized (subsumedCheckpoints) {
+            subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), 
completedCheckpoint);
+        }
+    }
+
+    public void cleanSubsumedCheckpoints(
+            long upTo, Set<Long> stillInUse, Runnable postCleanAction, 
Executor executor) {
+        synchronized (subsumedCheckpoints) {
+            Iterator<CompletedCheckpoint> iterator = 
subsumedCheckpoints.values().iterator();
+            while (iterator.hasNext()) {
+                CompletedCheckpoint checkpoint = iterator.next();
+                if (checkpoint.getCheckpointID() < upTo
+                        && !stillInUse.contains(checkpoint.getCheckpointID())) 
{
+                    iterator.remove();

Review Comment:
   Should we retry deletion in case of failure, by moving `iterator.remove();` 
under `catch`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -137,8 +146,8 @@ public StreamStateHandle registerReference(
         return entry.stateHandle;
     }
 
-    @Override
-    public void unregisterUnusedState(long lowestCheckpointID) {
+    private void unregisterState(
+            long lowestCheckpointID, Consumer<Long> markCheckpointInUseAction) 
{

Review Comment:
   I think it makes sense to cover the logic of computing 
`markCheckpointInUseAction` by some unit tests.
   
   It could be possible if `unregisterUnusedState` returns this set as 
suggested in another comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to