Hi Till,

Thanks for the quick response.

> for i) the cancellation depends on the user code. If the user code does a
blocking operation, Flink needs to wait until it returns from there before
it can move the Task's state to CANCELED.
for this, my code just includes a map operation and then aggregates the
results into a tumbling window. So I think in this case the time is not
attributed to the code.
I looked into the log, during the period, I observed that the jobmanager
continues warning that its connection to the failed taskmanager is confused.
I am not sure if this is the reason that delays the canceling, do you have
any ideas about this?

I am also looking the deadthwatch mechanism [1] of Akka to see if this is
the reason.

For (ii), I will open the JIRA issue for your mention.

Thanks.


[1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors

Regards.
Zhinan

On Wed, 19 Aug 2020 at 15:39, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Zhinan,
>
> for i) the cancellation depends on the user code. If the user code does a
> blocking operation, Flink needs to wait until it returns from there before
> it can move the Task's state to CANCELED.
>
> for ii) I think your observation is correct. Could you please open a JIRA
> issue for this problem so that it can be fixed in Flink? Thanks a lot!
>
> For the time to restore the checkpoints it could also be interesting to
> add a proper metric to Flink. Hence, you could also create a JIRA issue for
> it.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng <znch...@cse.cuhk.edu.hk>
> wrote:
>
>> Hi Yun,
>>
>> Thanks a lot for your help. Seems hard to measure the checkpointing
>> restore time currently.
>> I do monitor the "fullRestarts" metric and others like "uptime" and
>> "downtime" to observe some information about failure recovery.
>>
>> Still some confusions:
>> i) I found the time for the jobmanager to make the job from status
>> CANCELING to status CANCELED up to 30s?
>>      Is there any reason why it takes so long? Can I reduce this time?
>> ii) Currently the way to calculate the "downtime"  is not consistent with
>> the description in the doc, now the downtime is actually the current
>> timestamp minus the time timestamp when the job started.
>>     But I think the doc obviously only want to measure the current
>> timestamp minus the timestamp when the job failed.
>>
>> I think I need to measure these times by adding specified metrics myself.
>>
>> Regards,
>> Zhinan
>>
>>
>>
>>
>> On Wed, 19 Aug 2020 at 01:45, Yun Tang <myas...@live.com> wrote:
>>
>>> Hi Zhinan,
>>>
>>> For the time to detect the failure, you could refer to the time when
>>> 'fullRestarts' increase. That could give you information about the time of
>>> job failure.
>>>
>>> For the checkpoint recovery time, there actually exist two parts:
>>>
>>>    1. The time to read checkpoint meta in JM. However, this duration of
>>>    time has no explicit metrics currently as that part of duration would be
>>>    nearly just reading 1 MB file remotely from DFS.
>>>    2. The time for tasks to restore state. This should be treated as
>>>    the real time for checkpoint recovery and could even be 10 minutes+ when
>>>    restoring savepoint. Unfortunately, this part of time is also not 
>>> recorded
>>>    in metrics now.
>>>    If you find the task is in RUNNING state but not consume any record,
>>>    that might be stuck in restoring checkpoint/savepoint.
>>>
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Zhinan Cheng <chingchi...@gmail.com>
>>> *Sent:* Tuesday, August 18, 2020 11:50
>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>> *Subject:* Flink checkpoint recovery time
>>>
>>> Hi all,
>>>
>>> I am working on measuring the failure recovery time of Flink and I want
>>> to decompose the recovery time into different parts, say the time to detect
>>> the failure, the time to restart the job, and the time to
>>> restore the checkpointing.
>>>
>>> I found that I can measure the down time during failure and the time to
>>> restart the job and some metric for the checkpointing as below.
>>>
>>> [image: measure.png]
>>> Unfortunately, I cannot find any information about the failure detect
>>> time and checkpoint recovery time, Is there any way that Flink has provided
>>> for this, otherwise, how can I solve this?
>>>
>>> Thanks a lot for your help.
>>>
>>> Regards,
>>>
>>

Reply via email to