Hi Till,

Thanks for the reply.
Is the timeout 10s here always necessary?
Can I reduce this value to reduce the restart time of the job?
I cannot find this term in the configuration of Flink currently.

Regards,
Zhinan


On Fri, 21 Aug 2020 at 15:28, Till Rohrmann <trohrm...@apache.org> wrote:

> You are right. The problem is that Flink tries three times to cancel the
> call and every RPC call has a timeout of 10s. Since the machine on which
> the Task ran has died, it will take that long until the system decides to
> fail the Task instead [1].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1390
>
> Cheers,
> Till
>
> On Thu, Aug 20, 2020 at 6:17 PM Zhinan Cheng <znch...@cse.cuhk.edu.hk>
> wrote:
>
>> Hi Till,
>>
>> Thanks for the quick reply.
>>
>> Yes, the job actually restarts twice, the metric fullRestarts also
>> indicates this, its value is 2.
>> But the job indeed takes around 30s to switch from CANCELLING to
>> RESTARTING in its first restart.
>> I just wonder why it takes so long here?
>>
>> Also, even I set the heartbeat timeout from default 50s to 5s, this time
>> is similar, so I think this is nothing about the heartbeat timeout.
>>
>> Regards,
>> Zhinan
>>
>> On Fri, 21 Aug 2020 at 00:02, Till Rohrmann <trohrm...@apache.org> wrote:
>>
>>> Hi Zhinan,
>>>
>>> the logs show that the cancellation does not take 30s. What happens is
>>> that the job gets restarted a couple of times. The problem seems to be that
>>> one TaskManager died permanently but it takes the heartbeat timeout
>>> (default 50s) until it is detected as dead. In the meantime the system
>>> tries to redeploy tasks which will cause the job to fail again and again.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Aug 20, 2020 at 4:41 PM Zhinan Cheng <znch...@cse.cuhk.edu.hk>
>>> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Sorry for the late reply.
>>>> Attached is the log of jobmanager.
>>>> I notice that during canceling the job, the jobmanager also warns that
>>>> the connections to the failed taskmanager is lost.
>>>> And this lasts for about 30s, and then the jobmanager
>>>> successfully cancels the operator instances that related to the
>>>> failed taskmanager and restarts the job.
>>>> Does there anyway help reduce the restart time?
>>>>
>>>> Thanks a lot.
>>>>
>>>> Regards,
>>>> Zhinan
>>>>
>>>> On Wed, 19 Aug 2020 at 16:37, Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Could you share the logs with us? This might help to explain why the
>>>>> cancellation takes so long. Flink is no longer using Akka's death watch
>>>>> mechanism.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Aug 19, 2020 at 10:01 AM Zhinan Cheng <znch...@cse.cuhk.edu.hk>
>>>>> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>>
>>>>>> Thanks for the quick response.
>>>>>>
>>>>>> > for i) the cancellation depends on the user code. If the user code
>>>>>> does a blocking operation, Flink needs to wait until it returns from 
>>>>>> there
>>>>>> before it can move the Task's state to CANCELED.
>>>>>> for this, my code just includes a map operation and then aggregates
>>>>>> the results into a tumbling window. So I think in this case the time is 
>>>>>> not
>>>>>> attributed to the code.
>>>>>> I looked into the log, during the period, I observed that the
>>>>>> jobmanager continues warning that its connection to the failed
>>>>>> taskmanager is confused.
>>>>>> I am not sure if this is the reason that delays the canceling, do you
>>>>>> have any ideas about this?
>>>>>>
>>>>>> I am also looking the deadthwatch mechanism [1] of Akka to see if
>>>>>> this is the reason.
>>>>>>
>>>>>> For (ii), I will open the JIRA issue for your mention.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors
>>>>>>
>>>>>> Regards.
>>>>>> Zhinan
>>>>>>
>>>>>> On Wed, 19 Aug 2020 at 15:39, Till Rohrmann <trohrm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Zhinan,
>>>>>>>
>>>>>>> for i) the cancellation depends on the user code. If the user code
>>>>>>> does a blocking operation, Flink needs to wait until it returns from 
>>>>>>> there
>>>>>>> before it can move the Task's state to CANCELED.
>>>>>>>
>>>>>>> for ii) I think your observation is correct. Could you please open a
>>>>>>> JIRA issue for this problem so that it can be fixed in Flink? Thanks a 
>>>>>>> lot!
>>>>>>>
>>>>>>> For the time to restore the checkpoints it could also be interesting
>>>>>>> to add a proper metric to Flink. Hence, you could also create a JIRA 
>>>>>>> issue
>>>>>>> for it.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng <
>>>>>>> znch...@cse.cuhk.edu.hk> wrote:
>>>>>>>
>>>>>>>> Hi Yun,
>>>>>>>>
>>>>>>>> Thanks a lot for your help. Seems hard to measure the checkpointing
>>>>>>>> restore time currently.
>>>>>>>> I do monitor the "fullRestarts" metric and others like "uptime" and
>>>>>>>> "downtime" to observe some information about failure recovery.
>>>>>>>>
>>>>>>>> Still some confusions:
>>>>>>>> i) I found the time for the jobmanager to make the job from status
>>>>>>>> CANCELING to status CANCELED up to 30s?
>>>>>>>>      Is there any reason why it takes so long? Can I reduce this
>>>>>>>> time?
>>>>>>>> ii) Currently the way to calculate the "downtime"  is not
>>>>>>>> consistent with the description in the doc, now the downtime is 
>>>>>>>> actually
>>>>>>>> the current timestamp minus the time timestamp when the job started.
>>>>>>>>     But I think the doc obviously only want to measure the current
>>>>>>>> timestamp minus the timestamp when the job failed.
>>>>>>>>
>>>>>>>> I think I need to measure these times by adding specified metrics
>>>>>>>> myself.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Zhinan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, 19 Aug 2020 at 01:45, Yun Tang <myas...@live.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Zhinan,
>>>>>>>>>
>>>>>>>>> For the time to detect the failure, you could refer to the time
>>>>>>>>> when 'fullRestarts' increase. That could give you information about 
>>>>>>>>> the
>>>>>>>>> time of job failure.
>>>>>>>>>
>>>>>>>>> For the checkpoint recovery time, there actually exist two parts:
>>>>>>>>>
>>>>>>>>>    1. The time to read checkpoint meta in JM. However, this
>>>>>>>>>    duration of time has no explicit metrics currently as that part of 
>>>>>>>>> duration
>>>>>>>>>    would be nearly just reading 1 MB file remotely from DFS.
>>>>>>>>>    2. The time for tasks to restore state. This should be treated
>>>>>>>>>    as the real time for checkpoint recovery and could even be 10 
>>>>>>>>> minutes+ when
>>>>>>>>>    restoring savepoint. Unfortunately, this part of time is also not 
>>>>>>>>> recorded
>>>>>>>>>    in metrics now.
>>>>>>>>>    If you find the task is in RUNNING state but not consume any
>>>>>>>>>    record, that might be stuck in restoring checkpoint/savepoint.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>> Yun Tang
>>>>>>>>> ------------------------------
>>>>>>>>> *From:* Zhinan Cheng <chingchi...@gmail.com>
>>>>>>>>> *Sent:* Tuesday, August 18, 2020 11:50
>>>>>>>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>>>>>>>> *Subject:* Flink checkpoint recovery time
>>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I am working on measuring the failure recovery time of Flink and I
>>>>>>>>> want to decompose the recovery time into different parts, say the 
>>>>>>>>> time to
>>>>>>>>> detect the failure, the time to restart the job, and the time to
>>>>>>>>> restore the checkpointing.
>>>>>>>>>
>>>>>>>>> I found that I can measure the down time during failure and the
>>>>>>>>> time to restart the job and some metric for the checkpointing as 
>>>>>>>>> below.
>>>>>>>>>
>>>>>>>>> [image: measure.png]
>>>>>>>>> Unfortunately, I cannot find any information about the failure
>>>>>>>>> detect time and checkpoint recovery time, Is there any way that Flink 
>>>>>>>>> has
>>>>>>>>> provided for this, otherwise, how can I solve this?
>>>>>>>>>
>>>>>>>>> Thanks a lot for your help.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>

Reply via email to