Hi everyone,

I’d like to start a discussion on repeatable cleanup of checkpoint data. In
FLIP-194 [1] we introduced repeatable cleanup of HA data along the
introduction of the JobResultStore component. The goal was to make Flink
being in charge of cleanup for the data it owns. The Flink cluster should
only shutdown gracefully after all its artifacts are removed. That way, one
would not miss abandoned artifacts accidentally.

We forgot to cover one code path around cleaning up checkpoint data.
Currently, in case of an error (e.g. permission issues), checkpoints are
tried to be cleaned up in the CheckpointsCleaner and left like that if that
cleanup failed. A log message is printed. The user would be responsible for
cleaning up the data. This was discussed as part of the release testing
efforts for Flink 1.15 in FLINK-26388 [2].

We could add repeatable cleanup in the CheckpointsCleaner. We would have to
make sure that all StateObject#discardState implementations are idempotent.
This is not necessarily the case right now (see FLINK-26606 [3]).

Additionally, there is the problem of losing information about what
Checkpoints are subject to cleanup in case of JobManager failovers. These
Checkpoints are not stored as part of the HA data. Additionally,
PendingCheckpoints are not serialized in any way, either. None of these
artifacts are picked up again after a failover. I see the following options
here:

   -

   The purpose of CompletedCheckpointStore needs to be extended to become a
   “general” CheckpointStore. It will store PendingCheckpoints and
   CompletedCheckpoints that are marked for deletion. After a failover,
   CheckpointsCleaner can pick up these instances again and continue with
   the deletion process.

The flaw of that approach is that we’re increasing the amount of data that
is stored in the underlying StateHandleStore. Additionally, we’re going to
have an increased number of accesses to the CompletedCheckpointStore. These
accesses need to happen in the main thread; more specifically, adding
PendingCheckpoints and marking Checkpoints for deletion.

   -

   We’re actually interested in cleaning up artifacts from the FileSystem,
   i.e. the artifacts created by the StateHandleStore used within the
   DefaultCompletedCheckpointStore containing the serialized
   CompletedCheckpoint instance and the checkpoint’s folder containing the
   actual operator states. We could adapt the CompletedCheckpointStore in a
   way that any Checkpoint (including PendingCheckpoint) is serialized and
   persisted on the FileSystem right away (which is currently done within the
   StateHandleStore implementations when adding CompletedCheckpoints to the
   underlying HA system). The corresponding FileStateHandleObject
   (referring to that serialized CompletedCheckpoint) that gets persisted
   to ZooKeeper/k8s ConfigMap in the end would be only written if the
   CompletedCheckpoint is finalized and can be used. The CheckpointsCleaner
   could recover any artifacts from the FileSystem and cleanup anything that’s
   not listed in ZooKeeper/k8s ConfigMap.

This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s)
but would require additional IO on the FileSystem, still. It would require
some larger refactoring, though: The RetrievableStateHandle currently being
handled internally (i.e. in the StateHandleStore) need to become public.

   -

   We just add repeatable cleanup to the CheckpointsCleaner as is. No
   cleanup is picked up again after recovery. This could be a fallback for the
   user to reduce IO.


I’m interested in initial opinions from the community on that matter before
starting to work on a final FLIP.

Best,

Matthias



[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore

[2] https://issues.apache.org/jira/browse/FLINK-26388

[3] https://issues.apache.org/jira/browse/FLINK-26606

Reply via email to