[ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848118#comment-16848118
 ] 

Stefan Richter edited comment on FLINK-8871 at 5/25/19 9:58 AM:
----------------------------------------------------------------

After this quiet intense exchange, I was thinking a bit how we can move forward 
with many of the topics related to checkpoint failure handling, cancelation, 
and cleanups. In particular, I think it would be good if we could try to 
prioritize the work and try to come up with an attempt to a "conflict free" 
schedule (as much as possible). This schedule should consider the work that has 
already been made by different people and the best order in which we should get 
them into Flink. Let me try to summarize some of the different threads in this 
area:

- FLINK-11662: Without a proper cancellation scheme of running checkpoints in 
tasks, the JM currently eagerly deletes the checkpoint directory when a pending 
checkpoint is cancelled (e.g. when one task reported a failure). Tasks are not 
aware when a checkpoint on which they work failed, so their attempt to finish 
their part of the checkpoint can fail with IOException (directory was deleted 
by JM) and under the current default exception handling of such failures (see 
{{ExecutionConfig.failTaskOnCheckpointError}}) this causes Task failures that 
will restart the job. A lot about solutions to this problem is actually 
contained in the discussions under FLINK-10930, so I wanted to recall this a 
little bit. It seems like the solution could come a bit from two ends, and in 
the end it might be good to also address both sides: A) avoid that checkpoint 
failures (in particular in the async part) can cause a task to fail. This 
should be the new behavior and the current default is problematic. B) Take care 
of a better deletion of checkpoint directories on the JM - this what some of 
the other issues below are about. *IMO, point A) is the most important issue 
among all that we have to address. - please read FLINK-10930 again, the 
comments from [~StephanEwen] have valuable information for this issue.*
- FLINK-8871: This issue, introducing a mechanism that makes task aware that a 
checkpoint failed and was cancelled by decision of the JM. This is part of B) 
from my previous point, will lead to cleaner behavior and avoid unnecessary 
work on the task side. FLINK-10966 looks like one subtask in the context of 
this larger effort.
- FLINK-10724, FLINK-12209, et. al.: Refactoring of failure reporting (unified 
places, message), intention is to have decisions about how to react to failures 
only in the JM, enables "fail job after n failed checkpoints" feature. From 
what I observed on the PR from [~yanghua], it might be that this would benefit 
from *solving FLINK-11662 first* as well, because otherwise we will not be able 
to have all control over checkpoint failures only in the JM, as failing task 
can fail the job outside of the failure manager's control.
- FLINK-10855: CheckpointCoordinator does not delete checkpoint directory of 
late or failed checkpoints under some circumstances explained in the issue. 
Depending on some details, I think this might benefit from the clear 
cancellation with FLINK-8871.

There are some duplicates for some of the issues, I tried to pick the most 
representatives. I also tried to prioritize them in order of importance ind 
critical path. [~yunta] and [~yanghua], can you discuss what work was already 
done in your teams and divide those tasks in a conflict free way? I think there 
should be enough work for everybody. I would suggest to first discuss among you 
and then make the plan public. If we have the competing solutions for some 
parts, we can discuss them in the community, pick the most suitable or maybe 
fuse the best ideas from both. Wdyt? Is my list somewhat complete or did I 
overlook something crucial?


was (Author: srichter):
After this quiet intense exchange, I was thinking a bit how we can move forward 
with many of the topics related to checkpoint failure handling, cancelation, 
and cleanups. In particular, I think it would be good if we could try to 
prioritize the work and try to come up with an attempt to a "conflict free" 
schedule (as much as possible). This schedule should consider the work that has 
already been made by different people and the best order in which we should get 
them into Flink. Let me try to summarize some of the different threads in this 
area:

- FLINK-11662: Without a proper cancellation scheme of running checkpoints in 
tasks, the JM currently eagerly deletes the checkpoint directory when a pending 
checkpoint is cancelled (e.g. when one task reported a failure). Tasks are not 
aware when a checkpoint on which they work failed, so their attempt to finish 
their part of the checkpoint can fail with IOException (directory was deleted 
by JM) and under the current default exception handling of such failures (see 
{{ExecutionConfig.failTaskOnCheckpointError}}) this causes Task failures that 
will restart the job. A lot about solutions to this problem is actually 
contained in the discussions under FLINK-10930, so I wanted to recall this a 
little bit. It seems like the solution could come a bit from two ends, and in 
the end it might be good to also address both sides: A) avoid that checkpoint 
failures (in particular in the async part) can cause a task to fail. This 
should be the new behavior and the current default is problematic. B) Take care 
of a better deletion of checkpoint directories on the JM - this what some of 
the other issues below are about. *IMO, point A) is the most important issue 
among all that we have to address. - please read FLINK-10930 again, the 
comments from [~StephanEwen] have valuable information for this issue.*
- FLINK-8871: This issue, introducing a mechanism that makes task aware that a 
checkpoint failed and was cancelled by decision of the JM. This is part of B) 
from my previous point, will lead to cleaner behavior and avoid unnecessary 
work on the task side. FLINK-10966 looks like one subtask in the context of 
this larger effort.
- FLINK-10724: Refactoring of failure reporting (unified places, message), 
intention is to have decisions about how to react to failures only in the JM, 
enables "fail job after n failed checkpoints" feature. From what I observed on 
the PR from [~yanghua], it might be that this would benefit from *solving 
FLINK-11662 first* as well, because otherwise we will not be able to have all 
control over checkpoint failures only in the JM, as failing task can fail the 
job outside of the failure manager's control.
- FLINK-10855: CheckpointCoordinator does not delete checkpoint directory of 
late or failed checkpoints under some circumstances explained in the issue. 
Depending on some details, I think this might benefit from the clear 
cancellation with FLINK-8871.

There are some duplicates for some of the issues, I tried to pick the most 
representatives. I also tried to prioritize them in order of importance ind 
critical path. [~yunta] and [~yanghua], can you discuss what work was already 
done in your teams and divide those tasks in a conflict free way? I think there 
should be enough work for everybody. I would suggest to first discuss among you 
and then make the plan public. If we have the competing solutions for some 
parts, we can discuss them in the community, pick the most suitable or maybe 
fuse the best ideas from both. Wdyt? Is my list somewhat complete or did I 
overlook something crucial?

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8871
>                 URL: https://issues.apache.org/jira/browse/FLINK-8871
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0
>            Reporter: Stefan Richter
>            Priority: Critical
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to