[ https://issues.apache.org/jira/browse/FLINK-24852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17444489#comment-17444489 ]
Stephan Ewen commented on FLINK-24852: -------------------------------------- Very good comments in general, thanks! (1) About TaskManager-owned state: That is a good point. If we introduce this, we will have state that is not referenced by the SharedStateRegistry and still should not be cleaned up. To solve that, I assume we either need a way to iterate over all state handles in all completed checkpoints, or to let TM-owned state be registered in a separate set that the cleanup process could use, (2) FLINK-11868 looks like a good change in general, and I agree that this should be a prerequisite for this issue. (3) JM failover and Rest API call sound like good point to trigger this cleanup. I think orphaned state can also happen in cases where the TM wrote the file, but crashed before it could ack to the JM. (4) Regarding cleaning up old job directories: Would be interesting to support, but I agree with Yun Tang that this is super delicate. I would leave this for the first scope. (5) Using "previous+1" for pre-emtively written state sounds like it should work. > Cleanup of Orphaned Incremental State Artifacts > ----------------------------------------------- > > Key: FLINK-24852 > URL: https://issues.apache.org/jira/browse/FLINK-24852 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends > Affects Versions: 1.14.0 > Reporter: Stephan Ewen > Priority: Major > > Shared State Artifacts (state files in the "shared" folder in the DFS / > ObjectStore) can become orphaned in various situations: > * When a TaskManager fails right after it created a state file but before the > checkpoint was ack-ed to the JobManager, that state file will be orphaned. > * When the JobManager fails all state newly added for the currently pending > checkpoint will be orphaned. > These state artifacts are currently impossible to be cleaned up manually, > because it isn't easily possible to understand whether they are still being > used (referenced by any checkpoint). > We should introduce a "garbage collector" that identifies and deletes such > orphaned state artifacts. > h2. Idea for a cleanup mechanism > A periodic cleanup thread would periodically execute a cleanup procedure that > searches for and deletes the orphaned artifacts. > To identify those artifacts, the cleanup procedure needs the following inputs: > * The oldest retained checkpoint ID > * A snapshot of the shared state registry > * A way to identify for each state artifact from which checkpoint it was > created. > The cleanup procedure would > * enumerate all state artifacts (for example files in the "shared" directory) > * For each one check whether it was created earlier than the oldest retained > checkpoint. If not, that artifact would be skipped, because it might come > from a later pending checkpoint, or later canceled checkpoint. > * Finally, the procedure checks if the state artifact is known by the shared > state registry. If yes, the artifact is kept, if not, it is orphaned and will > be deleted. > Because the cleanup procedure is specific to the checkpoint storage, it > should probably be instantiated from the checkpoint storage. > To make it possible to identify the checkpoint for which a state artifact was > created, we can put that checkpoint ID into the state file name, for example > format the state name as {{"<checkpointID>_<UUID>"}}. -- This message was sent by Atlassian Jira (v8.20.1#820001)