Another side question, Shall we add metric to cover the complete restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only covers phase 1. Thanks!
Best Lu On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote: > Thanks TIll and Yang for help! Also Thanks Till for a quick fix! > > I did another test yesterday. In this test, I intentionally throw > exception from the source operator: > ``` > if (runtimeContext.getIndexOfThisSubtask() == 1 > && errorFrenquecyInMin > 0 > && System.currentTimeMillis() - lastStartTime >= > errorFrenquecyInMin * 60 * 1000) { > lastStartTime = System.currentTimeMillis(); > throw new RuntimeException( > "Trigger expected exception at: " + lastStartTime); > } > ``` > In this case, I found phase 1 still takes about 30s and Phase 2 dropped to > 1s (because no need for container allocation). Why phase 1 still takes 30s > even though no TM is lost? > > Related logs: > ``` > 2021-06-30 00:55:07,463 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ... > java.lang.RuntimeException: Trigger expected exception at: 1625014507446 > 2021-06-30 00:55:07,509 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging > (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to > RESTARTING. > 2021-06-30 00:55:37,596 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging > (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to > RUNNING. > 2021-06-30 00:55:38,678 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph (time when > all tasks switch from CREATED to RUNNING) > ``` > Best > Lu > > > On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote: > >> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >> >> I did another test yesterday. In this test, I intentionally throw >> exception from the source operator: >> ``` >> if (runtimeContext.getIndexOfThisSubtask() == 1 >> && errorFrenquecyInMin > 0 >> && System.currentTimeMillis() - lastStartTime >= >> errorFrenquecyInMin * 60 * 1000) { >> lastStartTime = System.currentTimeMillis(); >> throw new RuntimeException( >> "Trigger expected exception at: " + lastStartTime); >> } >> ``` >> In this case, I found phase 1 still takes about 30s and Phase 2 dropped >> to 1s (because no need for container allocation). >> >> Some logs: >> ``` >> ``` >> >> >> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> A quick addition, I think with FLINK-23202 it should now also be >>> possible to improve the heartbeat mechanism in the general case. We can >>> leverage the unreachability exception thrown if a remote target is no >>> longer reachable to mark an heartbeat target as no longer reachable [1]. >>> This can then be considered as if the heartbeat timeout has been triggered. >>> That way we should detect lost TaskExecutors as fast as our heartbeat >>> interval is. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-23209 >>> >>> Cheers, >>> Till >>> >>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote: >>> >>>> Since you are deploying Flink workloads on Yarn, the Flink >>>> ResourceManager should get the container >>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, >>>> which is 8 seconds by default. >>>> And Flink ResourceManager will release the dead TaskManager container >>>> once received the completion event. >>>> As a result, Flink will not deploy tasks onto the dead TaskManagers. >>>> >>>> >>>> I think most of the time cost in Phase 1 might be cancelling the tasks >>>> on the dead TaskManagers. >>>> >>>> >>>> Best, >>>> Yang >>>> >>>> >>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道: >>>> >>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as >>>>> the primary means to detect dead TaskManagers. This means that Flink will >>>>> take at least `heartbeat.timeout` time before the system recovers. Even if >>>>> the cancellation happens fast (e.g. by having configured a low >>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead >>>>> TaskManager until it is marked as dead and its slots are released (unless >>>>> the ResourceManager does not get a signal from the underlying resource >>>>> management system that a container/pod has died). One way to improve the >>>>> situation is to introduce logic which can react to a ConnectionException >>>>> and then black lists or releases a TaskManager, for example. This is >>>>> currently not implemented in Flink, though. >>>>> >>>>> Concerning the cancellation operation: Flink currently does not listen >>>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the >>>>> primary means to fail the future result of a rpc which could not be sent. >>>>> This is also an improvement we should add to Flink's RpcService. I've >>>>> created a JIRA issue for this problem [1]. >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202 >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote: >>>>> >>>>>> Thanks Gen! cc flink-dev to collect more inputs. >>>>>> >>>>>> Best >>>>>> Lu >>>>>> >>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote: >>>>>> >>>>>>> I'm also wondering here. >>>>>>> >>>>>>> In my opinion, it's because the JM can not confirm whether the TM is >>>>>>> lost or it's a temporary network trouble and will recover soon, since I >>>>>>> can >>>>>>> see in the log that akka has got a Connection refused but JM still >>>>>>> sends a >>>>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But >>>>>>> I'm not sure if it's indeed designed like this. >>>>>>> >>>>>>> I would really appreciate it if anyone who knows more details could >>>>>>> answer. Thanks. >>>>>>> >>>>>>