rkhachatryan commented on code in PR #19448: URL: https://github.com/apache/flink/pull/19448#discussion_r881739752
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ########## @@ -50,6 +54,9 @@ public class CheckpointsCleaner implements Serializable, AutoCloseableAsync { @Nullable private CompletableFuture<Void> cleanUpFuture; + /** All subsumed checkpoints. */ + private Map<Long, CompletedCheckpoint> subsumedCheckpoints = new HashMap<>(); Review Comment: IDE suggests that this field can be final. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java: ########## @@ -57,13 +63,30 @@ public interface SharedStateRegistry extends AutoCloseable { */ StreamStateHandle registerReference( SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID); + /** * Unregister state that is not referenced by the given checkpoint ID or any newer. * * @param lowestCheckpointID which is still valid */ + @VisibleForTesting void unregisterUnusedState(long lowestCheckpointID); Review Comment: The method is still used by production code, so the annotation is unnecessary. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ########## @@ -156,16 +165,40 @@ public void unregisterUnusedState(long lowestCheckpointID) { subsumed.add(entry.stateHandle); } it.remove(); + } else { + markCheckpointInUseAction.accept(entry.createdByCheckpointID); } } } - LOG.trace("Discard {} state asynchronously", subsumed.size()); for (StreamStateHandle handle : subsumed) { scheduleAsyncDelete(handle); } } + @Override + public void unregisterUnusedState(long lowestCheckpointID) { + // only used for some tests + unregisterState(lowestCheckpointID, (x) -> {}); + } + + @Override + public void unregisterUnusedStateAndCheckpoint( + long lowestCheckpointID, + CheckpointsCleaner checkpointsCleaner, + Runnable postCleanUp, + @Nullable Executor executor) { + Set<Long> checkpointInUse = new HashSet<>(); + unregisterState(lowestCheckpointID, id -> checkpointInUse.add(id)); + if (executor != null) { + checkpointsCleaner.cleanSubsumedCheckpoints( + lowestCheckpointID, checkpointInUse, postCleanUp, executor); + } else { + checkpointsCleaner.cleanSubsumedCheckpoints( + lowestCheckpointID, checkpointInUse, postCleanUp, asyncDisposalExecutor); Review Comment: WDYT about returning checkpointsInUse from this function (`unregisterUnusedState`) and calling `checkpointsCleaner.cleanSubsumedCheckpoints` outside (i.e. in `CompletedCheckpointStore`)? That way, `SharedStateRegistry` is kept unaware of checkpoints cleanup. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ########## @@ -107,6 +144,7 @@ private void maybeCompleteCloseUnsafe() { if (numberOfCheckpointsToClean == 0 && cleanUpFuture != null) { cleanUpFuture.complete(null); } + subsumedCheckpoints.clear(); Review Comment: Shouldn't this statement be placed under `if`? Otherwise, it will clear `subsumedCheckpoints` after every checkpoint cleanup? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ########## @@ -65,6 +72,36 @@ public void cleanCheckpoint( cleanup(checkpoint, discardObject::discard, postCleanAction, executor); } + public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) { + synchronized (subsumedCheckpoints) { + subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint); + } + } + + public void cleanSubsumedCheckpoints( + long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) { Review Comment: I think it makes sense to cover this logic by some unit tests. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ########## @@ -156,16 +165,40 @@ public void unregisterUnusedState(long lowestCheckpointID) { subsumed.add(entry.stateHandle); } it.remove(); + } else { + markCheckpointInUseAction.accept(entry.createdByCheckpointID); } } } - LOG.trace("Discard {} state asynchronously", subsumed.size()); for (StreamStateHandle handle : subsumed) { scheduleAsyncDelete(handle); } } + @Override + public void unregisterUnusedState(long lowestCheckpointID) { + // only used for some tests + unregisterState(lowestCheckpointID, (x) -> {}); + } + + @Override + public void unregisterUnusedStateAndCheckpoint( + long lowestCheckpointID, + CheckpointsCleaner checkpointsCleaner, + Runnable postCleanUp, + @Nullable Executor executor) { + Set<Long> checkpointInUse = new HashSet<>(); + unregisterState(lowestCheckpointID, id -> checkpointInUse.add(id)); Review Comment: nit: `unregisterState(lowestCheckpointID, checkpointInUse::add);` ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ########## @@ -65,6 +72,36 @@ public void cleanCheckpoint( cleanup(checkpoint, discardObject::discard, postCleanAction, executor); } + public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) { + synchronized (subsumedCheckpoints) { + subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint); + } + } + + public void cleanSubsumedCheckpoints( + long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) { + synchronized (subsumedCheckpoints) { + Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator(); + while (iterator.hasNext()) { + CompletedCheckpoint checkpoint = iterator.next(); + if (checkpoint.getCheckpointID() < upTo + && !stillInUse.contains(checkpoint.getCheckpointID())) { + iterator.remove(); + LOG.trace("Discard checkpoint {}.", checkpoint.getCheckpointID()); Review Comment: `trace` -> `debug`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java: ########## @@ -137,22 +137,21 @@ public CompletedCheckpoint addCheckpointAndSubsumeOldestOne( completedCheckpoints.addLast(checkpoint); + // Remove completed checkpoint from queue and checkpointStateHandleStore, not discard. Optional<CompletedCheckpoint> subsume = CheckpointSubsumeHelper.subsume( completedCheckpoints, maxNumberOfCheckpointsToRetain, - completedCheckpoint -> - tryRemoveCompletedCheckpoint( - completedCheckpoint, - completedCheckpoint.shouldBeDiscardedOnSubsume(), - checkpointsCleaner, - postCleanup)); - unregisterUnusedState(completedCheckpoints); - - if (subsume.isPresent()) { - LOG.debug("Added {} to {} without any older checkpoint to subsume.", checkpoint, path); - } else { - LOG.debug("Added {} to {} and subsume {}.", checkpoint, path, subsume); + completedCheckpoint -> { + tryRemove(completedCheckpoint.getCheckpointID()); + checkpointsCleaner.addSubsumedCheckpoint(completedCheckpoint); + }); + + Optional<Long> lowestCheckpointId = findLowest(completedCheckpoints); + if (lowestCheckpointId.isPresent()) { Review Comment: nit: `isPresent` -> `ifPresent` ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java: ########## @@ -65,6 +72,36 @@ public void cleanCheckpoint( cleanup(checkpoint, discardObject::discard, postCleanAction, executor); } + public void addSubsumedCheckpoint(CompletedCheckpoint completedCheckpoint) { + synchronized (subsumedCheckpoints) { + subsumedCheckpoints.put(completedCheckpoint.getCheckpointID(), completedCheckpoint); + } + } + + public void cleanSubsumedCheckpoints( + long upTo, Set<Long> stillInUse, Runnable postCleanAction, Executor executor) { + synchronized (subsumedCheckpoints) { + Iterator<CompletedCheckpoint> iterator = subsumedCheckpoints.values().iterator(); + while (iterator.hasNext()) { + CompletedCheckpoint checkpoint = iterator.next(); + if (checkpoint.getCheckpointID() < upTo + && !stillInUse.contains(checkpoint.getCheckpointID())) { + iterator.remove(); Review Comment: Should we retry deletion in case of failure, by moving `iterator.remove();` under `catch`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ########## @@ -137,8 +146,8 @@ public StreamStateHandle registerReference( return entry.stateHandle; } - @Override - public void unregisterUnusedState(long lowestCheckpointID) { + private void unregisterState( + long lowestCheckpointID, Consumer<Long> markCheckpointInUseAction) { Review Comment: I think it makes sense to cover the logic of computing `markCheckpointInUseAction` by some unit tests. It could be possible if `unregisterUnusedState` returns this set as suggested in another comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org