Thanks Matthias for continuously improving the clean-up process. Given that we highly depends on K8s APIServer for HA implementation, I am not in favor of storing too many entries in the ConfigMap, as well as adding more update requests to the APIServer. So I lean towards Proposal #2. It just works like we revert the current mark-deletion in StateHandleStore and then introduce a completely new FileSystem based artifacts clean-up mechanism.
When doing the failover, I suggest the clean-up to be processed asynchronously. Otherwise, listing the completed checkpoints and deleting the invalid ones will take too much time and slow down the recovery process. Best, Yang Matthias Pohl <matthias.p...@aiven.io.invalid> 于2022年10月27日周四 20:20写道: > I would like to bring this topic up one more time. I put some more thought > into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an > updated version of what I summarized in my previous email. It would be > interesting to get some additional perspectives on this; more specifically, > the two included proposals about either just repurposing the > CompletedCheckpointStore into a more generic CheckpointStore or refactoring > the StateHandleStore interface moving all the cleanup logic from the > CheckpointsCleaner and StateHandleStore into what's currently called > CompletedCheckpointStore. > > Looking forward to feedback on that proposal. > > Best, > Matthias > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore > > On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl <matthias.p...@aiven.io> > wrote: > > > Hi everyone, > > > > I’d like to start a discussion on repeatable cleanup of checkpoint data. > > In FLIP-194 [1] we introduced repeatable cleanup of HA data along the > > introduction of the JobResultStore component. The goal was to make Flink > > being in charge of cleanup for the data it owns. The Flink cluster should > > only shutdown gracefully after all its artifacts are removed. That way, > one > > would not miss abandoned artifacts accidentally. > > > > We forgot to cover one code path around cleaning up checkpoint data. > > Currently, in case of an error (e.g. permission issues), checkpoints are > > tried to be cleaned up in the CheckpointsCleaner and left like that if > > that cleanup failed. A log message is printed. The user would be > > responsible for cleaning up the data. This was discussed as part of the > > release testing efforts for Flink 1.15 in FLINK-26388 [2]. > > > > We could add repeatable cleanup in the CheckpointsCleaner. We would have > > to make sure that all StateObject#discardState implementations are > > idempotent. This is not necessarily the case right now (see FLINK-26606 > > [3]). > > > > Additionally, there is the problem of losing information about what > > Checkpoints are subject to cleanup in case of JobManager failovers. These > > Checkpoints are not stored as part of the HA data. Additionally, > > PendingCheckpoints are not serialized in any way, either. None of these > > artifacts are picked up again after a failover. I see the following > options > > here: > > > > - > > > > The purpose of CompletedCheckpointStore needs to be extended to become > > a “general” CheckpointStore. It will store PendingCheckpoints and > > CompletedCheckpoints that are marked for deletion. After a failover, > > CheckpointsCleaner can pick up these instances again and continue with > > the deletion process. > > > > The flaw of that approach is that we’re increasing the amount of data > that > > is stored in the underlying StateHandleStore. Additionally, we’re going > > to have an increased number of accesses to the CompletedCheckpointStore. > > These accesses need to happen in the main thread; more specifically, > adding > > PendingCheckpoints and marking Checkpoints for deletion. > > > > - > > > > We’re actually interested in cleaning up artifacts from the > > FileSystem, i.e. the artifacts created by the StateHandleStore used > > within the DefaultCompletedCheckpointStore containing the serialized > > CompletedCheckpoint instance and the checkpoint’s folder containing > > the actual operator states. We could adapt the > CompletedCheckpointStore > > in a way that any Checkpoint (including PendingCheckpoint) is > > serialized and persisted on the FileSystem right away (which is > currently > > done within the StateHandleStore implementations when adding > > CompletedCheckpoints to the underlying HA system). The corresponding > > FileStateHandleObject (referring to that serialized > CompletedCheckpoint) > > that gets persisted to ZooKeeper/k8s ConfigMap in the end would be > only > > written if the CompletedCheckpoint is finalized and can be used. The > > CheckpointsCleaner could recover any artifacts from the FileSystem and > > cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap. > > > > This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s) > > but would require additional IO on the FileSystem, still. It would > require > > some larger refactoring, though: The RetrievableStateHandle currently > being > > handled internally (i.e. in the StateHandleStore) need to become public. > > > > - > > > > We just add repeatable cleanup to the CheckpointsCleaner as is. No > > cleanup is picked up again after recovery. This could be a fallback > for the > > user to reduce IO. > > > > > > I’m interested in initial opinions from the community on that matter > > before starting to work on a final FLIP. > > > > Best, > > > > Matthias > > > > > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore > > > > [2] https://issues.apache.org/jira/browse/FLINK-26388 > > > > [3] https://issues.apache.org/jira/browse/FLINK-26606 > > >