You are right. The problem is that Flink tries three times to cancel the
call and every RPC call has a timeout of 10s. Since the machine on which
the Task ran has died, it will take that long until the system decides to
fail the Task instead [1].

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1390

Cheers,
Till

On Thu, Aug 20, 2020 at 6:17 PM Zhinan Cheng <znch...@cse.cuhk.edu.hk>
wrote:

> Hi Till,
>
> Thanks for the quick reply.
>
> Yes, the job actually restarts twice, the metric fullRestarts also
> indicates this, its value is 2.
> But the job indeed takes around 30s to switch from CANCELLING to
> RESTARTING in its first restart.
> I just wonder why it takes so long here?
>
> Also, even I set the heartbeat timeout from default 50s to 5s, this time
> is similar, so I think this is nothing about the heartbeat timeout.
>
> Regards,
> Zhinan
>
> On Fri, 21 Aug 2020 at 00:02, Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Zhinan,
>>
>> the logs show that the cancellation does not take 30s. What happens is
>> that the job gets restarted a couple of times. The problem seems to be that
>> one TaskManager died permanently but it takes the heartbeat timeout
>> (default 50s) until it is detected as dead. In the meantime the system
>> tries to redeploy tasks which will cause the job to fail again and again.
>>
>> Cheers,
>> Till
>>
>> On Thu, Aug 20, 2020 at 4:41 PM Zhinan Cheng <znch...@cse.cuhk.edu.hk>
>> wrote:
>>
>>> Hi Till,
>>>
>>> Sorry for the late reply.
>>> Attached is the log of jobmanager.
>>> I notice that during canceling the job, the jobmanager also warns that
>>> the connections to the failed taskmanager is lost.
>>> And this lasts for about 30s, and then the jobmanager
>>> successfully cancels the operator instances that related to the
>>> failed taskmanager and restarts the job.
>>> Does there anyway help reduce the restart time?
>>>
>>> Thanks a lot.
>>>
>>> Regards,
>>> Zhinan
>>>
>>> On Wed, 19 Aug 2020 at 16:37, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Could you share the logs with us? This might help to explain why the
>>>> cancellation takes so long. Flink is no longer using Akka's death watch
>>>> mechanism.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Aug 19, 2020 at 10:01 AM Zhinan Cheng <znch...@cse.cuhk.edu.hk>
>>>> wrote:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> Thanks for the quick response.
>>>>>
>>>>> > for i) the cancellation depends on the user code. If the user code
>>>>> does a blocking operation, Flink needs to wait until it returns from there
>>>>> before it can move the Task's state to CANCELED.
>>>>> for this, my code just includes a map operation and then aggregates
>>>>> the results into a tumbling window. So I think in this case the time is 
>>>>> not
>>>>> attributed to the code.
>>>>> I looked into the log, during the period, I observed that the
>>>>> jobmanager continues warning that its connection to the failed
>>>>> taskmanager is confused.
>>>>> I am not sure if this is the reason that delays the canceling, do you
>>>>> have any ideas about this?
>>>>>
>>>>> I am also looking the deadthwatch mechanism [1] of Akka to see if this
>>>>> is the reason.
>>>>>
>>>>> For (ii), I will open the JIRA issue for your mention.
>>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors
>>>>>
>>>>> Regards.
>>>>> Zhinan
>>>>>
>>>>> On Wed, 19 Aug 2020 at 15:39, Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Zhinan,
>>>>>>
>>>>>> for i) the cancellation depends on the user code. If the user code
>>>>>> does a blocking operation, Flink needs to wait until it returns from 
>>>>>> there
>>>>>> before it can move the Task's state to CANCELED.
>>>>>>
>>>>>> for ii) I think your observation is correct. Could you please open a
>>>>>> JIRA issue for this problem so that it can be fixed in Flink? Thanks a 
>>>>>> lot!
>>>>>>
>>>>>> For the time to restore the checkpoints it could also be interesting
>>>>>> to add a proper metric to Flink. Hence, you could also create a JIRA 
>>>>>> issue
>>>>>> for it.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng <znch...@cse.cuhk.edu.hk>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Yun,
>>>>>>>
>>>>>>> Thanks a lot for your help. Seems hard to measure the checkpointing
>>>>>>> restore time currently.
>>>>>>> I do monitor the "fullRestarts" metric and others like "uptime" and
>>>>>>> "downtime" to observe some information about failure recovery.
>>>>>>>
>>>>>>> Still some confusions:
>>>>>>> i) I found the time for the jobmanager to make the job from status
>>>>>>> CANCELING to status CANCELED up to 30s?
>>>>>>>      Is there any reason why it takes so long? Can I reduce this
>>>>>>> time?
>>>>>>> ii) Currently the way to calculate the "downtime"  is not consistent
>>>>>>> with the description in the doc, now the downtime is actually the 
>>>>>>> current
>>>>>>> timestamp minus the time timestamp when the job started.
>>>>>>>     But I think the doc obviously only want to measure the current
>>>>>>> timestamp minus the timestamp when the job failed.
>>>>>>>
>>>>>>> I think I need to measure these times by adding specified metrics
>>>>>>> myself.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Zhinan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 19 Aug 2020 at 01:45, Yun Tang <myas...@live.com> wrote:
>>>>>>>
>>>>>>>> Hi Zhinan,
>>>>>>>>
>>>>>>>> For the time to detect the failure, you could refer to the time
>>>>>>>> when 'fullRestarts' increase. That could give you information about the
>>>>>>>> time of job failure.
>>>>>>>>
>>>>>>>> For the checkpoint recovery time, there actually exist two parts:
>>>>>>>>
>>>>>>>>    1. The time to read checkpoint meta in JM. However, this
>>>>>>>>    duration of time has no explicit metrics currently as that part of 
>>>>>>>> duration
>>>>>>>>    would be nearly just reading 1 MB file remotely from DFS.
>>>>>>>>    2. The time for tasks to restore state. This should be treated
>>>>>>>>    as the real time for checkpoint recovery and could even be 10 
>>>>>>>> minutes+ when
>>>>>>>>    restoring savepoint. Unfortunately, this part of time is also not 
>>>>>>>> recorded
>>>>>>>>    in metrics now.
>>>>>>>>    If you find the task is in RUNNING state but not consume any
>>>>>>>>    record, that might be stuck in restoring checkpoint/savepoint.
>>>>>>>>
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Yun Tang
>>>>>>>> ------------------------------
>>>>>>>> *From:* Zhinan Cheng <chingchi...@gmail.com>
>>>>>>>> *Sent:* Tuesday, August 18, 2020 11:50
>>>>>>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>>>>>>> *Subject:* Flink checkpoint recovery time
>>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I am working on measuring the failure recovery time of Flink and I
>>>>>>>> want to decompose the recovery time into different parts, say the time 
>>>>>>>> to
>>>>>>>> detect the failure, the time to restart the job, and the time to
>>>>>>>> restore the checkpointing.
>>>>>>>>
>>>>>>>> I found that I can measure the down time during failure and the
>>>>>>>> time to restart the job and some metric for the checkpointing as below.
>>>>>>>>
>>>>>>>> [image: measure.png]
>>>>>>>> Unfortunately, I cannot find any information about the failure
>>>>>>>> detect time and checkpoint recovery time, Is there any way that Flink 
>>>>>>>> has
>>>>>>>> provided for this, otherwise, how can I solve this?
>>>>>>>>
>>>>>>>> Thanks a lot for your help.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>

Reply via email to