Thanks Matthias for continuously improving the clean-up process.

Given that we highly depends on K8s APIServer for HA implementation, I am
not in favor of storing too many entries in the ConfigMap,
as well as adding more update requests to the APIServer. So I lean towards
Proposal #2. It just works like we revert the current mark-deletion
in StateHandleStore and then introduce a completely new FileSystem based
artifacts clean-up mechanism.

When doing the failover, I suggest the clean-up to be processed
asynchronously. Otherwise, listing the completed checkpoints and deleting
the invalid ones will take too much time and slow down the recovery process.

Best,
Yang

Matthias Pohl <matthias.p...@aiven.io.invalid> 于2022年10月27日周四 20:20写道:

> I would like to bring this topic up one more time. I put some more thought
> into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
> updated version of what I summarized in my previous email. It would be
> interesting to get some additional perspectives on this; more specifically,
> the two included proposals about either just repurposing the
> CompletedCheckpointStore into a more generic CheckpointStore or refactoring
> the StateHandleStore interface moving all the cleanup logic from the
> CheckpointsCleaner and StateHandleStore into what's currently called
> CompletedCheckpointStore.
>
> Looking forward to feedback on that proposal.
>
> Best,
> Matthias
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>
> On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl <matthias.p...@aiven.io>
> wrote:
>
> > Hi everyone,
> >
> > I’d like to start a discussion on repeatable cleanup of checkpoint data.
> > In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
> > introduction of the JobResultStore component. The goal was to make Flink
> > being in charge of cleanup for the data it owns. The Flink cluster should
> > only shutdown gracefully after all its artifacts are removed. That way,
> one
> > would not miss abandoned artifacts accidentally.
> >
> > We forgot to cover one code path around cleaning up checkpoint data.
> > Currently, in case of an error (e.g. permission issues), checkpoints are
> > tried to be cleaned up in the CheckpointsCleaner and left like that if
> > that cleanup failed. A log message is printed. The user would be
> > responsible for cleaning up the data. This was discussed as part of the
> > release testing efforts for Flink 1.15 in FLINK-26388 [2].
> >
> > We could add repeatable cleanup in the CheckpointsCleaner. We would have
> > to make sure that all StateObject#discardState implementations are
> > idempotent. This is not necessarily the case right now (see FLINK-26606
> > [3]).
> >
> > Additionally, there is the problem of losing information about what
> > Checkpoints are subject to cleanup in case of JobManager failovers. These
> > Checkpoints are not stored as part of the HA data. Additionally,
> > PendingCheckpoints are not serialized in any way, either. None of these
> > artifacts are picked up again after a failover. I see the following
> options
> > here:
> >
> >    -
> >
> >    The purpose of CompletedCheckpointStore needs to be extended to become
> >    a “general” CheckpointStore. It will store PendingCheckpoints and
> >    CompletedCheckpoints that are marked for deletion. After a failover,
> >    CheckpointsCleaner can pick up these instances again and continue with
> >    the deletion process.
> >
> > The flaw of that approach is that we’re increasing the amount of data
> that
> > is stored in the underlying StateHandleStore. Additionally, we’re going
> > to have an increased number of accesses to the CompletedCheckpointStore.
> > These accesses need to happen in the main thread; more specifically,
> adding
> > PendingCheckpoints and marking Checkpoints for deletion.
> >
> >    -
> >
> >    We’re actually interested in cleaning up artifacts from the
> >    FileSystem, i.e. the artifacts created by the StateHandleStore used
> >    within the DefaultCompletedCheckpointStore containing the serialized
> >    CompletedCheckpoint instance and the checkpoint’s folder containing
> >    the actual operator states. We could adapt the
> CompletedCheckpointStore
> >    in a way that any Checkpoint (including PendingCheckpoint) is
> >    serialized and persisted on the FileSystem right away (which is
> currently
> >    done within the StateHandleStore implementations when adding
> >    CompletedCheckpoints to the underlying HA system). The corresponding
> >    FileStateHandleObject (referring to that serialized
> CompletedCheckpoint)
> >    that gets persisted to ZooKeeper/k8s ConfigMap in the end would be
> only
> >    written if the CompletedCheckpoint is finalized and can be used. The
> >    CheckpointsCleaner could recover any artifacts from the FileSystem and
> >    cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap.
> >
> > This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s)
> > but would require additional IO on the FileSystem, still. It would
> require
> > some larger refactoring, though: The RetrievableStateHandle currently
> being
> > handled internally (i.e. in the StateHandleStore) need to become public.
> >
> >    -
> >
> >    We just add repeatable cleanup to the CheckpointsCleaner as is. No
> >    cleanup is picked up again after recovery. This could be a fallback
> for the
> >    user to reduce IO.
> >
> >
> > I’m interested in initial opinions from the community on that matter
> > before starting to work on a final FLIP.
> >
> > Best,
> >
> > Matthias
> >
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
> >
> > [2] https://issues.apache.org/jira/browse/FLINK-26388
> >
> > [3] https://issues.apache.org/jira/browse/FLINK-26606
> >
>

Reply via email to