[ 
https://issues.apache.org/jira/browse/FLINK-24852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17444489#comment-17444489
 ] 

Stephan Ewen commented on FLINK-24852:
--------------------------------------

Very good comments in general, thanks!

(1) About TaskManager-owned state:

That is a good point. If we introduce this, we will have state that is not 
referenced by the SharedStateRegistry and still should not be cleaned up.

To solve that, I assume we either need a way to iterate over all state handles 
in all completed checkpoints, or to let TM-owned state be registered in a 
separate set that the cleanup process could use,

(2) FLINK-11868 looks like a good change in general, and I agree that this 
should be a prerequisite for this issue.

(3) JM failover and Rest API call sound like good point to trigger this cleanup.

I think orphaned state can also happen in cases where the TM wrote the file, 
but crashed before it could ack to the JM.

(4) Regarding cleaning up old job directories: Would be interesting to support, 
but I agree with Yun Tang that this is super delicate. I would leave this for 
the first scope.

(5) Using "previous+1" for pre-emtively written state sounds like it should 
work.

> Cleanup of Orphaned Incremental State Artifacts
> -----------------------------------------------
>
>                 Key: FLINK-24852
>                 URL: https://issues.apache.org/jira/browse/FLINK-24852
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>    Affects Versions: 1.14.0
>            Reporter: Stephan Ewen
>            Priority: Major
>
> Shared State Artifacts (state files in the "shared" folder in the DFS / 
> ObjectStore) can become orphaned in various situations:
> * When a TaskManager fails right after it created a state file but before the 
> checkpoint was ack-ed to the JobManager, that state file will be orphaned.
> * When the JobManager fails all state newly added for the currently pending 
> checkpoint will be orphaned.
> These state artifacts are currently impossible to be cleaned up manually, 
> because it isn't easily possible to understand whether they are still being 
> used (referenced by any checkpoint).
> We should introduce a "garbage collector" that identifies and deletes such 
> orphaned state artifacts.
> h2. Idea for a cleanup mechanism
> A periodic cleanup thread would periodically execute a cleanup procedure that 
> searches for and deletes the orphaned artifacts.
> To identify those artifacts, the cleanup procedure needs the following inputs:
> * The oldest retained checkpoint ID
> * A snapshot of the shared state registry
> * A way to identify for each state artifact from which checkpoint it was 
> created.
> The cleanup procedure would
> * enumerate all state artifacts (for example files in the "shared" directory)
> * For each one check whether it was created earlier than the oldest retained 
> checkpoint. If not, that artifact would be skipped, because it might come 
> from a later pending checkpoint, or later canceled checkpoint.
> * Finally, the procedure checks if the state artifact is known by the shared 
> state registry. If yes, the artifact is kept, if not, it is orphaned and will 
> be deleted.
> Because the cleanup procedure is specific to the checkpoint storage, it 
> should probably be instantiated from the checkpoint storage.
> To make it possible to identify the checkpoint for which a state artifact was 
> created, we can put that checkpoint ID into the state file name, for example 
> format the state name as {{"<checkpointID>_<UUID>"}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to