[ 
https://issues.apache.org/jira/browse/FLINK-10855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686065#comment-16686065
 ] 

Yun Tang commented on FLINK-10855:
----------------------------------

[~till.rohrmann], as far as I know, the checkpoint directory is introduced with 
FLINK-8531 but remaining due to FLINK-8540 not to delete its parent directory. 
I think FLINK-8540 is really essential due to the list action for a large 
directory is really expensive not only S3 but also HDFS. However, this issue 
not only leaves untouched directory but also might cause the job failover. If 
we take current ongoing checkpoint as '{{chk-4}}', and this checkpoint expired 
after 10 minutes, we would drop this checkpoint in {{CheckpointCoordinator}} 
and delete the '{{chk-4}}' folder, however, if one not responding task creates 
its operator state handle before the parent '{{chk-4}}' folder deleted but 
close the handle after the parent '{{chk-4}}' folder deleted, it would throw 
exception during this async checkpoint phase, which result in the overall job 
failover by default.

We plan to split previous EXCLUSIVE folder into two types of folder, one kind 
for meta data only, and another containing exclusive data such as 
operator-state. Besides we could also add checkpoint id in the materialized 
file name, makes it both for users and cleanup hooks easy to determine when a 
checkpoint could be deleted.

Waht's more, I think this issue also has relationship with FLINK-9043 if we 
plan to refactor the directory structure. We could only need to tell the 
externalized meta folder (or job-id folder) so that streaming job could resume 
from the latest checkpoint in a friendly way.

We already did some work on this issue, but I see [~yanghua] already took it, 
what's your plan to resolve it?

> CheckpointCoordinator does not delete checkpoint directory of late/failed 
> checkpoints
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-10855
>                 URL: https://issues.apache.org/jira/browse/FLINK-10855
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.5, 1.6.2, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: vinoyang
>            Priority: Major
>
> In case that an acknowledge checkpoint message is late or a checkpoint cannot 
> be acknowledged, we discard the subtask state in the 
> {{CheckpointCoordinator}}. What's not happening in this case is that we 
> delete the parent directory of the checkpoint. This only happens when we 
> dispose a {{PendingCheckpoint#dispose}}. 
> Due to this behaviour it can happen that a checkpoint fails (e.g. a task not 
> being ready) and we delete the checkpoint directory. Next another task writes 
> its checkpoint data to the checkpoint directory (thereby creating it again) 
> and sending an acknowledge message back to the {{CheckpointCoordinator}}. The 
> {{CheckpointCoordinator}} will realize that there is no longer a 
> {{PendingCheckpoint}} and will discard the sub task state. This will remove 
> the state files from the checkpoint directory but will leave the checkpoint 
> directory untouched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to