This is a nice FLIP. I particular like how much background it provides
on the issue; something that other FLIPs could certainly benefit from...
I went over the FLIP and had a chat with Matthias about it.
Somewhat unrelated to the FLIP we found a flaw in the current cleanup
mechanism of failed checkpoints, where the JM deletes files while a TM
may still be in the process of writing checkpoint data. This is because
we never wait for an ack from the TMs that that have aborted the checkpoint.
We additionally noted that when incremental checkpoints are enabled we
might be storing a large number of checkpoints in HA, without a
conclusion on what to do about it.
As for the FLIP itself, I'm concerned about proposal #2 because it
requires iterating over the entire checkpoint directory on /any/
failover to find checkpoints that can be deleted. This can be an
expensive operation for certain filesystems (S3), particularly when
incremental checkpoints are being used.
In the interest of fast failovers we ideally don't use mechanisms that
scale with.../anything/, really.
However, storing more data in HA is also concerning, as Yang Wang
pointed out.
To not increase the number of requests made against HA we could maybe
consider looking into piggy-backing delete operations on other HA
operations, like the checkpoint counter increments.
On that note, do we have any benchmarks for HA? I remember we looked
into that for...1.15 I believe at some point. With HA load being such a
major concern for this FLIP it would be good to have _something_ to
measure that.
On 27/10/2022 14:20, Matthias Pohl wrote:
I would like to bring this topic up one more time. I put some more thought
into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
updated version of what I summarized in my previous email. It would be
interesting to get some additional perspectives on this; more specifically,
the two included proposals about either just repurposing the
CompletedCheckpointStore into a more generic CheckpointStore or refactoring
the StateHandleStore interface moving all the cleanup logic from the
CheckpointsCleaner and StateHandleStore into what's currently called
CompletedCheckpointStore.
Looking forward to feedback on that proposal.
Best,
Matthias
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl<matthias.p...@aiven.io>
wrote:
Hi everyone,
I’d like to start a discussion on repeatable cleanup of checkpoint data.
In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
introduction of the JobResultStore component. The goal was to make Flink
being in charge of cleanup for the data it owns. The Flink cluster should
only shutdown gracefully after all its artifacts are removed. That way, one
would not miss abandoned artifacts accidentally.
We forgot to cover one code path around cleaning up checkpoint data.
Currently, in case of an error (e.g. permission issues), checkpoints are
tried to be cleaned up in the CheckpointsCleaner and left like that if
that cleanup failed. A log message is printed. The user would be
responsible for cleaning up the data. This was discussed as part of the
release testing efforts for Flink 1.15 in FLINK-26388 [2].
We could add repeatable cleanup in the CheckpointsCleaner. We would have
to make sure that all StateObject#discardState implementations are
idempotent. This is not necessarily the case right now (see FLINK-26606
[3]).
Additionally, there is the problem of losing information about what
Checkpoints are subject to cleanup in case of JobManager failovers. These
Checkpoints are not stored as part of the HA data. Additionally,
PendingCheckpoints are not serialized in any way, either. None of these
artifacts are picked up again after a failover. I see the following options
here:
-
The purpose of CompletedCheckpointStore needs to be extended to become
a “general” CheckpointStore. It will store PendingCheckpoints and
CompletedCheckpoints that are marked for deletion. After a failover,
CheckpointsCleaner can pick up these instances again and continue with
the deletion process.
The flaw of that approach is that we’re increasing the amount of data that
is stored in the underlying StateHandleStore. Additionally, we’re going
to have an increased number of accesses to the CompletedCheckpointStore.
These accesses need to happen in the main thread; more specifically, adding
PendingCheckpoints and marking Checkpoints for deletion.
-
We’re actually interested in cleaning up artifacts from the
FileSystem, i.e. the artifacts created by the StateHandleStore used
within the DefaultCompletedCheckpointStore containing the serialized
CompletedCheckpoint instance and the checkpoint’s folder containing
the actual operator states. We could adapt the CompletedCheckpointStore
in a way that any Checkpoint (including PendingCheckpoint) is
serialized and persisted on the FileSystem right away (which is currently
done within the StateHandleStore implementations when adding
CompletedCheckpoints to the underlying HA system). The corresponding
FileStateHandleObject (referring to that serialized CompletedCheckpoint)
that gets persisted to ZooKeeper/k8s ConfigMap in the end would be only
written if the CompletedCheckpoint is finalized and can be used. The
CheckpointsCleaner could recover any artifacts from the FileSystem and
cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap.
This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s)
but would require additional IO on the FileSystem, still. It would require
some larger refactoring, though: The RetrievableStateHandle currently being
handled internally (i.e. in the StateHandleStore) need to become public.
-
We just add repeatable cleanup to the CheckpointsCleaner as is. No
cleanup is picked up again after recovery. This could be a fallback for the
user to reduce IO.
I’m interested in initial opinions from the community on that matter
before starting to work on a final FLIP.
Best,
Matthias
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
[2]https://issues.apache.org/jira/browse/FLINK-26388
[3]https://issues.apache.org/jira/browse/FLINK-26606