Hi everyone, I’d like to start a discussion on repeatable cleanup of checkpoint data. In FLIP-194 [1] we introduced repeatable cleanup of HA data along the introduction of the JobResultStore component. The goal was to make Flink being in charge of cleanup for the data it owns. The Flink cluster should only shutdown gracefully after all its artifacts are removed. That way, one would not miss abandoned artifacts accidentally.
We forgot to cover one code path around cleaning up checkpoint data. Currently, in case of an error (e.g. permission issues), checkpoints are tried to be cleaned up in the CheckpointsCleaner and left like that if that cleanup failed. A log message is printed. The user would be responsible for cleaning up the data. This was discussed as part of the release testing efforts for Flink 1.15 in FLINK-26388 [2]. We could add repeatable cleanup in the CheckpointsCleaner. We would have to make sure that all StateObject#discardState implementations are idempotent. This is not necessarily the case right now (see FLINK-26606 [3]). Additionally, there is the problem of losing information about what Checkpoints are subject to cleanup in case of JobManager failovers. These Checkpoints are not stored as part of the HA data. Additionally, PendingCheckpoints are not serialized in any way, either. None of these artifacts are picked up again after a failover. I see the following options here: - The purpose of CompletedCheckpointStore needs to be extended to become a “general” CheckpointStore. It will store PendingCheckpoints and CompletedCheckpoints that are marked for deletion. After a failover, CheckpointsCleaner can pick up these instances again and continue with the deletion process. The flaw of that approach is that we’re increasing the amount of data that is stored in the underlying StateHandleStore. Additionally, we’re going to have an increased number of accesses to the CompletedCheckpointStore. These accesses need to happen in the main thread; more specifically, adding PendingCheckpoints and marking Checkpoints for deletion. - We’re actually interested in cleaning up artifacts from the FileSystem, i.e. the artifacts created by the StateHandleStore used within the DefaultCompletedCheckpointStore containing the serialized CompletedCheckpoint instance and the checkpoint’s folder containing the actual operator states. We could adapt the CompletedCheckpointStore in a way that any Checkpoint (including PendingCheckpoint) is serialized and persisted on the FileSystem right away (which is currently done within the StateHandleStore implementations when adding CompletedCheckpoints to the underlying HA system). The corresponding FileStateHandleObject (referring to that serialized CompletedCheckpoint) that gets persisted to ZooKeeper/k8s ConfigMap in the end would be only written if the CompletedCheckpoint is finalized and can be used. The CheckpointsCleaner could recover any artifacts from the FileSystem and cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap. This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s) but would require additional IO on the FileSystem, still. It would require some larger refactoring, though: The RetrievableStateHandle currently being handled internally (i.e. in the StateHandleStore) need to become public. - We just add repeatable cleanup to the CheckpointsCleaner as is. No cleanup is picked up again after recovery. This could be a fallback for the user to reduce IO. I’m interested in initial opinions from the community on that matter before starting to work on a final FLIP. Best, Matthias [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore [2] https://issues.apache.org/jira/browse/FLINK-26388 [3] https://issues.apache.org/jira/browse/FLINK-26606