Hi Till, Thanks for the reply. Is the timeout 10s here always necessary? Can I reduce this value to reduce the restart time of the job? I cannot find this term in the configuration of Flink currently.
Regards, Zhinan On Fri, 21 Aug 2020 at 15:28, Till Rohrmann <trohrm...@apache.org> wrote: > You are right. The problem is that Flink tries three times to cancel the > call and every RPC call has a timeout of 10s. Since the machine on which > the Task ran has died, it will take that long until the system decides to > fail the Task instead [1]. > > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1390 > > Cheers, > Till > > On Thu, Aug 20, 2020 at 6:17 PM Zhinan Cheng <znch...@cse.cuhk.edu.hk> > wrote: > >> Hi Till, >> >> Thanks for the quick reply. >> >> Yes, the job actually restarts twice, the metric fullRestarts also >> indicates this, its value is 2. >> But the job indeed takes around 30s to switch from CANCELLING to >> RESTARTING in its first restart. >> I just wonder why it takes so long here? >> >> Also, even I set the heartbeat timeout from default 50s to 5s, this time >> is similar, so I think this is nothing about the heartbeat timeout. >> >> Regards, >> Zhinan >> >> On Fri, 21 Aug 2020 at 00:02, Till Rohrmann <trohrm...@apache.org> wrote: >> >>> Hi Zhinan, >>> >>> the logs show that the cancellation does not take 30s. What happens is >>> that the job gets restarted a couple of times. The problem seems to be that >>> one TaskManager died permanently but it takes the heartbeat timeout >>> (default 50s) until it is detected as dead. In the meantime the system >>> tries to redeploy tasks which will cause the job to fail again and again. >>> >>> Cheers, >>> Till >>> >>> On Thu, Aug 20, 2020 at 4:41 PM Zhinan Cheng <znch...@cse.cuhk.edu.hk> >>> wrote: >>> >>>> Hi Till, >>>> >>>> Sorry for the late reply. >>>> Attached is the log of jobmanager. >>>> I notice that during canceling the job, the jobmanager also warns that >>>> the connections to the failed taskmanager is lost. >>>> And this lasts for about 30s, and then the jobmanager >>>> successfully cancels the operator instances that related to the >>>> failed taskmanager and restarts the job. >>>> Does there anyway help reduce the restart time? >>>> >>>> Thanks a lot. >>>> >>>> Regards, >>>> Zhinan >>>> >>>> On Wed, 19 Aug 2020 at 16:37, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Could you share the logs with us? This might help to explain why the >>>>> cancellation takes so long. Flink is no longer using Akka's death watch >>>>> mechanism. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Aug 19, 2020 at 10:01 AM Zhinan Cheng <znch...@cse.cuhk.edu.hk> >>>>> wrote: >>>>> >>>>>> Hi Till, >>>>>> >>>>>> Thanks for the quick response. >>>>>> >>>>>> > for i) the cancellation depends on the user code. If the user code >>>>>> does a blocking operation, Flink needs to wait until it returns from >>>>>> there >>>>>> before it can move the Task's state to CANCELED. >>>>>> for this, my code just includes a map operation and then aggregates >>>>>> the results into a tumbling window. So I think in this case the time is >>>>>> not >>>>>> attributed to the code. >>>>>> I looked into the log, during the period, I observed that the >>>>>> jobmanager continues warning that its connection to the failed >>>>>> taskmanager is confused. >>>>>> I am not sure if this is the reason that delays the canceling, do you >>>>>> have any ideas about this? >>>>>> >>>>>> I am also looking the deadthwatch mechanism [1] of Akka to see if >>>>>> this is the reason. >>>>>> >>>>>> For (ii), I will open the JIRA issue for your mention. >>>>>> >>>>>> Thanks. >>>>>> >>>>>> >>>>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors >>>>>> >>>>>> Regards. >>>>>> Zhinan >>>>>> >>>>>> On Wed, 19 Aug 2020 at 15:39, Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Zhinan, >>>>>>> >>>>>>> for i) the cancellation depends on the user code. If the user code >>>>>>> does a blocking operation, Flink needs to wait until it returns from >>>>>>> there >>>>>>> before it can move the Task's state to CANCELED. >>>>>>> >>>>>>> for ii) I think your observation is correct. Could you please open a >>>>>>> JIRA issue for this problem so that it can be fixed in Flink? Thanks a >>>>>>> lot! >>>>>>> >>>>>>> For the time to restore the checkpoints it could also be interesting >>>>>>> to add a proper metric to Flink. Hence, you could also create a JIRA >>>>>>> issue >>>>>>> for it. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng < >>>>>>> znch...@cse.cuhk.edu.hk> wrote: >>>>>>> >>>>>>>> Hi Yun, >>>>>>>> >>>>>>>> Thanks a lot for your help. Seems hard to measure the checkpointing >>>>>>>> restore time currently. >>>>>>>> I do monitor the "fullRestarts" metric and others like "uptime" and >>>>>>>> "downtime" to observe some information about failure recovery. >>>>>>>> >>>>>>>> Still some confusions: >>>>>>>> i) I found the time for the jobmanager to make the job from status >>>>>>>> CANCELING to status CANCELED up to 30s? >>>>>>>> Is there any reason why it takes so long? Can I reduce this >>>>>>>> time? >>>>>>>> ii) Currently the way to calculate the "downtime" is not >>>>>>>> consistent with the description in the doc, now the downtime is >>>>>>>> actually >>>>>>>> the current timestamp minus the time timestamp when the job started. >>>>>>>> But I think the doc obviously only want to measure the current >>>>>>>> timestamp minus the timestamp when the job failed. >>>>>>>> >>>>>>>> I think I need to measure these times by adding specified metrics >>>>>>>> myself. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Zhinan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, 19 Aug 2020 at 01:45, Yun Tang <myas...@live.com> wrote: >>>>>>>> >>>>>>>>> Hi Zhinan, >>>>>>>>> >>>>>>>>> For the time to detect the failure, you could refer to the time >>>>>>>>> when 'fullRestarts' increase. That could give you information about >>>>>>>>> the >>>>>>>>> time of job failure. >>>>>>>>> >>>>>>>>> For the checkpoint recovery time, there actually exist two parts: >>>>>>>>> >>>>>>>>> 1. The time to read checkpoint meta in JM. However, this >>>>>>>>> duration of time has no explicit metrics currently as that part of >>>>>>>>> duration >>>>>>>>> would be nearly just reading 1 MB file remotely from DFS. >>>>>>>>> 2. The time for tasks to restore state. This should be treated >>>>>>>>> as the real time for checkpoint recovery and could even be 10 >>>>>>>>> minutes+ when >>>>>>>>> restoring savepoint. Unfortunately, this part of time is also not >>>>>>>>> recorded >>>>>>>>> in metrics now. >>>>>>>>> If you find the task is in RUNNING state but not consume any >>>>>>>>> record, that might be stuck in restoring checkpoint/savepoint. >>>>>>>>> >>>>>>>>> >>>>>>>>> Best >>>>>>>>> Yun Tang >>>>>>>>> ------------------------------ >>>>>>>>> *From:* Zhinan Cheng <chingchi...@gmail.com> >>>>>>>>> *Sent:* Tuesday, August 18, 2020 11:50 >>>>>>>>> *To:* user@flink.apache.org <user@flink.apache.org> >>>>>>>>> *Subject:* Flink checkpoint recovery time >>>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I am working on measuring the failure recovery time of Flink and I >>>>>>>>> want to decompose the recovery time into different parts, say the >>>>>>>>> time to >>>>>>>>> detect the failure, the time to restart the job, and the time to >>>>>>>>> restore the checkpointing. >>>>>>>>> >>>>>>>>> I found that I can measure the down time during failure and the >>>>>>>>> time to restart the job and some metric for the checkpointing as >>>>>>>>> below. >>>>>>>>> >>>>>>>>> [image: measure.png] >>>>>>>>> Unfortunately, I cannot find any information about the failure >>>>>>>>> detect time and checkpoint recovery time, Is there any way that Flink >>>>>>>>> has >>>>>>>>> provided for this, otherwise, how can I solve this? >>>>>>>>> >>>>>>>>> Thanks a lot for your help. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> >>>>>>>>