[ https://issues.apache.org/jira/browse/FLINK-10930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16712017#comment-16712017 ]
Stephan Ewen edited comment on FLINK-10930 at 12/6/18 8:53 PM: --------------------------------------------------------------- [~yunta] Better exception handling is definitely always a really good idea. I would suggest to proceed as follows: - We work towards a model where checkpoint failures never cause task failures, but only "decline messages". As fas as I know, [~azagrebin] mentioned that there is some discussion already. Could you chime in here, Andrey? - We fix all cases of state being left in exception cases that we find. - We introduce an exclusive state cleanup daemon (see below) - We introduce a shared state cleanup daemon (see below) h3. Cleaning up exclusive state (idea) - when the daemon starts a cleanup sweep it determines what the oldest still retained checkpoint number is - it enumerates all directories "CHK-XXX" and recursively deletes the ones older than the oldest retained checkpoint h3. Cleaning up shared state (idea) - every shared state file needs to encode the checkpoint-id of the checkpoint during which it was created - when the daemon does a cleanup swipe, it atomically grabs the number of the latest completed checkpoint, and a snapshot of all shared state handles in the shared state registry - the daemon then lists all files in the shared state directory and deletes all files that are neither referenced by a shared state handle and are older than the latest completed checkpoint What do you think about that? was (Author: stephanewen): [~yunta] Better exception handling is definitely always a really good idea. I would suggest to proceed as follows: - We work towards a model where checkpoint failures never cause task failures, but only "decline messages". As fas as I know, [~azagrebin] mentioned that there is some discussion already. Could you chime in here, Andrey? - We fix all cases of state being left in exception cases that we find. - We introduce an exclusive state cleanup daemon (see below) - We introduce a shared state cleanup daemon (see below) h3. Cleaning up exclusive state (idea) - when the daemon starts a cleanup sweep it determines what the oldest still retained checkpoint number is - it enumerates all directories "CHK-XXX" and recursively deletes the ones older than the oldest retained checkpoint h3. Cleaning up shared state (idea) - every shared state file needs to encode the checkpoint-id of the checkpoint during which it was created - when the daemon does a cleanup swipe, it atomically grabs the number of the latest completed checkpoint, and a snapshot of all shared state handles in the shared state registry - the daemon then lists all files in the shared state directory and deletes all files that are neither referenced by a shared state handle and are older than the latest completed checkpoint What do you think about that? > Refactor checkpoint directory layout > ------------------------------------ > > Key: FLINK-10930 > URL: https://issues.apache.org/jira/browse/FLINK-10930 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.8.0 > Reporter: Yun Tang > Assignee: Yun Tang > Priority: Major > Fix For: 1.8.0 > > > The current checkpoint directory layout is introduced from FLINK-8531 with > three different scopes for tasks: > * *EXCLUSIVE* is for state that belongs to one checkpoint only, meta data > and operator state files. > * *SHARED* is for state that is possibly part of multiple checkpoints > * *TASKOWNED* is for state that must never by dropped by the jobManager. > {code:java} > /user-defined-dir/{job-id} > | > +-- shared/ > +-- taskowned/ > +-- chk-1/ // metadata and operator-state files > +-- chk-2/ > ...{code} > If we just retain one complete checkpoint, the expected exclusive directory, > which is the {{chk-id}} checkpoint directory, should only be one left. > However, as FLINK-10855 interpreted, the failed/expired checkpoint > directories would also be left. This is really confusing for users who [uses > externalized checkpoint to resume > job|https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint], > not to mention the checkpoint directory resource leak. > As far as I could know, if the {{chk-id}} checkpoint directory still > contains the operator state files, I have no idea how to clean the useless > {{chk-id}} checkpoint directory gracefully. Once job manager dispose the > failed/expired checkpoint, the target {{chk-id}} checkpoint directory would > be deleted by JM. However, this directory would also be create by tasks who > having not reported to JM. When {{checkpoint coordinator}} received those > late expired tasks, it would discard those useless handles. However, if JM > also plans to delete the empty parent folder, which is already unsupported > after FLINK-8540, another task uploading operator state files would meet > exception due to its writing target's parent directory has just been removed. > Currently, we handle task checkpoint failure as task failure and the whole > job would failover which is not we want. > From what I see, I plan to separate *EXCLUSIVE* directory into two kind of > exclusive directories, one is still several {{chk-id}} checkpoint directories > but only contains its exclusive {{meta data}}, the other is just one > directory named {{exclusive}} which containing the operator state files. > Operator state files are exclusive to just one specified checkpoint, we could > also add {{checkpoint-id}} within their file name to let users easily clean > up. > The refactored directory layout should be : > {code:java} > /user-defined-dir/{job-id} > | > +-- shared/ > +-- taskowned/ > +-- exclusive/ // operator state files > +-- chk-1/ // metadata > +-- chk-2/ > ...{code} > > This new directory layout would not affect users who use external checkpoint > to resume jobs, since they still just give > {{/user-defined-dir/job-id/chk-id}} path to resume job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)