You are right. The problem is that Flink tries three times to cancel the call and every RPC call has a timeout of 10s. Since the machine on which the Task ran has died, it will take that long until the system decides to fail the Task instead [1].
[1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1390 Cheers, Till On Thu, Aug 20, 2020 at 6:17 PM Zhinan Cheng <znch...@cse.cuhk.edu.hk> wrote: > Hi Till, > > Thanks for the quick reply. > > Yes, the job actually restarts twice, the metric fullRestarts also > indicates this, its value is 2. > But the job indeed takes around 30s to switch from CANCELLING to > RESTARTING in its first restart. > I just wonder why it takes so long here? > > Also, even I set the heartbeat timeout from default 50s to 5s, this time > is similar, so I think this is nothing about the heartbeat timeout. > > Regards, > Zhinan > > On Fri, 21 Aug 2020 at 00:02, Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Zhinan, >> >> the logs show that the cancellation does not take 30s. What happens is >> that the job gets restarted a couple of times. The problem seems to be that >> one TaskManager died permanently but it takes the heartbeat timeout >> (default 50s) until it is detected as dead. In the meantime the system >> tries to redeploy tasks which will cause the job to fail again and again. >> >> Cheers, >> Till >> >> On Thu, Aug 20, 2020 at 4:41 PM Zhinan Cheng <znch...@cse.cuhk.edu.hk> >> wrote: >> >>> Hi Till, >>> >>> Sorry for the late reply. >>> Attached is the log of jobmanager. >>> I notice that during canceling the job, the jobmanager also warns that >>> the connections to the failed taskmanager is lost. >>> And this lasts for about 30s, and then the jobmanager >>> successfully cancels the operator instances that related to the >>> failed taskmanager and restarts the job. >>> Does there anyway help reduce the restart time? >>> >>> Thanks a lot. >>> >>> Regards, >>> Zhinan >>> >>> On Wed, 19 Aug 2020 at 16:37, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Could you share the logs with us? This might help to explain why the >>>> cancellation takes so long. Flink is no longer using Akka's death watch >>>> mechanism. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Aug 19, 2020 at 10:01 AM Zhinan Cheng <znch...@cse.cuhk.edu.hk> >>>> wrote: >>>> >>>>> Hi Till, >>>>> >>>>> Thanks for the quick response. >>>>> >>>>> > for i) the cancellation depends on the user code. If the user code >>>>> does a blocking operation, Flink needs to wait until it returns from there >>>>> before it can move the Task's state to CANCELED. >>>>> for this, my code just includes a map operation and then aggregates >>>>> the results into a tumbling window. So I think in this case the time is >>>>> not >>>>> attributed to the code. >>>>> I looked into the log, during the period, I observed that the >>>>> jobmanager continues warning that its connection to the failed >>>>> taskmanager is confused. >>>>> I am not sure if this is the reason that delays the canceling, do you >>>>> have any ideas about this? >>>>> >>>>> I am also looking the deadthwatch mechanism [1] of Akka to see if this >>>>> is the reason. >>>>> >>>>> For (ii), I will open the JIRA issue for your mention. >>>>> >>>>> Thanks. >>>>> >>>>> >>>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors >>>>> >>>>> Regards. >>>>> Zhinan >>>>> >>>>> On Wed, 19 Aug 2020 at 15:39, Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Zhinan, >>>>>> >>>>>> for i) the cancellation depends on the user code. If the user code >>>>>> does a blocking operation, Flink needs to wait until it returns from >>>>>> there >>>>>> before it can move the Task's state to CANCELED. >>>>>> >>>>>> for ii) I think your observation is correct. Could you please open a >>>>>> JIRA issue for this problem so that it can be fixed in Flink? Thanks a >>>>>> lot! >>>>>> >>>>>> For the time to restore the checkpoints it could also be interesting >>>>>> to add a proper metric to Flink. Hence, you could also create a JIRA >>>>>> issue >>>>>> for it. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng <znch...@cse.cuhk.edu.hk> >>>>>> wrote: >>>>>> >>>>>>> Hi Yun, >>>>>>> >>>>>>> Thanks a lot for your help. Seems hard to measure the checkpointing >>>>>>> restore time currently. >>>>>>> I do monitor the "fullRestarts" metric and others like "uptime" and >>>>>>> "downtime" to observe some information about failure recovery. >>>>>>> >>>>>>> Still some confusions: >>>>>>> i) I found the time for the jobmanager to make the job from status >>>>>>> CANCELING to status CANCELED up to 30s? >>>>>>> Is there any reason why it takes so long? Can I reduce this >>>>>>> time? >>>>>>> ii) Currently the way to calculate the "downtime" is not consistent >>>>>>> with the description in the doc, now the downtime is actually the >>>>>>> current >>>>>>> timestamp minus the time timestamp when the job started. >>>>>>> But I think the doc obviously only want to measure the current >>>>>>> timestamp minus the timestamp when the job failed. >>>>>>> >>>>>>> I think I need to measure these times by adding specified metrics >>>>>>> myself. >>>>>>> >>>>>>> Regards, >>>>>>> Zhinan >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, 19 Aug 2020 at 01:45, Yun Tang <myas...@live.com> wrote: >>>>>>> >>>>>>>> Hi Zhinan, >>>>>>>> >>>>>>>> For the time to detect the failure, you could refer to the time >>>>>>>> when 'fullRestarts' increase. That could give you information about the >>>>>>>> time of job failure. >>>>>>>> >>>>>>>> For the checkpoint recovery time, there actually exist two parts: >>>>>>>> >>>>>>>> 1. The time to read checkpoint meta in JM. However, this >>>>>>>> duration of time has no explicit metrics currently as that part of >>>>>>>> duration >>>>>>>> would be nearly just reading 1 MB file remotely from DFS. >>>>>>>> 2. The time for tasks to restore state. This should be treated >>>>>>>> as the real time for checkpoint recovery and could even be 10 >>>>>>>> minutes+ when >>>>>>>> restoring savepoint. Unfortunately, this part of time is also not >>>>>>>> recorded >>>>>>>> in metrics now. >>>>>>>> If you find the task is in RUNNING state but not consume any >>>>>>>> record, that might be stuck in restoring checkpoint/savepoint. >>>>>>>> >>>>>>>> >>>>>>>> Best >>>>>>>> Yun Tang >>>>>>>> ------------------------------ >>>>>>>> *From:* Zhinan Cheng <chingchi...@gmail.com> >>>>>>>> *Sent:* Tuesday, August 18, 2020 11:50 >>>>>>>> *To:* user@flink.apache.org <user@flink.apache.org> >>>>>>>> *Subject:* Flink checkpoint recovery time >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I am working on measuring the failure recovery time of Flink and I >>>>>>>> want to decompose the recovery time into different parts, say the time >>>>>>>> to >>>>>>>> detect the failure, the time to restart the job, and the time to >>>>>>>> restore the checkpointing. >>>>>>>> >>>>>>>> I found that I can measure the down time during failure and the >>>>>>>> time to restart the job and some metric for the checkpointing as below. >>>>>>>> >>>>>>>> [image: measure.png] >>>>>>>> Unfortunately, I cannot find any information about the failure >>>>>>>> detect time and checkpoint recovery time, Is there any way that Flink >>>>>>>> has >>>>>>>> provided for this, otherwise, how can I solve this? >>>>>>>> >>>>>>>> Thanks a lot for your help. >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>