Thanks for sharing, Till and Yang. @Lu Sorry but I don't know how to explain the new test with the log. Let's wait for others' reply.
@Till It would be nice if JIRAs could be fixed. Thanks again for proposing them. In addition, I was tracking an issue that RM keeps allocating and freeing slots after a TM lost until its heartbeat timeout, when I found the recovery costing as long as heartbeat timeout. That should be a minor bug introduced by declarative resource management. I have created a JIRA about the problem [1] and we can discuss it there if necessary. [1] https://issues.apache.org/jira/browse/FLINK-23216 Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道: > Another side question, Shall we add metric to cover the complete > restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only > covers phase 1. Thanks! > > Best > Lu > > On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote: > >> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >> >> I did another test yesterday. In this test, I intentionally throw >> exception from the source operator: >> ``` >> if (runtimeContext.getIndexOfThisSubtask() == 1 >> && errorFrenquecyInMin > 0 >> && System.currentTimeMillis() - lastStartTime >= >> errorFrenquecyInMin * 60 * 1000) { >> lastStartTime = System.currentTimeMillis(); >> throw new RuntimeException( >> "Trigger expected exception at: " + lastStartTime); >> } >> ``` >> In this case, I found phase 1 still takes about 30s and Phase 2 dropped >> to 1s (because no need for container allocation). Why phase 1 still takes >> 30s even though no TM is lost? >> >> Related logs: >> ``` >> 2021-06-30 00:55:07,463 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ... >> java.lang.RuntimeException: Trigger expected exception at: 1625014507446 >> 2021-06-30 00:55:07,509 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to >> RESTARTING. >> 2021-06-30 00:55:37,596 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to >> RUNNING. >> 2021-06-30 00:55:38,678 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph (time when >> all tasks switch from CREATED to RUNNING) >> ``` >> Best >> Lu >> >> >> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote: >> >>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >>> >>> I did another test yesterday. In this test, I intentionally throw >>> exception from the source operator: >>> ``` >>> if (runtimeContext.getIndexOfThisSubtask() == 1 >>> && errorFrenquecyInMin > 0 >>> && System.currentTimeMillis() - lastStartTime >= >>> errorFrenquecyInMin * 60 * 1000) { >>> lastStartTime = System.currentTimeMillis(); >>> throw new RuntimeException( >>> "Trigger expected exception at: " + lastStartTime); >>> } >>> ``` >>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped >>> to 1s (because no need for container allocation). >>> >>> Some logs: >>> ``` >>> ``` >>> >>> >>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> A quick addition, I think with FLINK-23202 it should now also be >>>> possible to improve the heartbeat mechanism in the general case. We can >>>> leverage the unreachability exception thrown if a remote target is no >>>> longer reachable to mark an heartbeat target as no longer reachable [1]. >>>> This can then be considered as if the heartbeat timeout has been triggered. >>>> That way we should detect lost TaskExecutors as fast as our heartbeat >>>> interval is. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-23209 >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote: >>>> >>>>> Since you are deploying Flink workloads on Yarn, the Flink >>>>> ResourceManager should get the container >>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, >>>>> which is 8 seconds by default. >>>>> And Flink ResourceManager will release the dead TaskManager container >>>>> once received the completion event. >>>>> As a result, Flink will not deploy tasks onto the dead TaskManagers. >>>>> >>>>> >>>>> I think most of the time cost in Phase 1 might be cancelling the tasks >>>>> on the dead TaskManagers. >>>>> >>>>> >>>>> Best, >>>>> Yang >>>>> >>>>> >>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道: >>>>> >>>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as >>>>>> the primary means to detect dead TaskManagers. This means that Flink will >>>>>> take at least `heartbeat.timeout` time before the system recovers. Even >>>>>> if >>>>>> the cancellation happens fast (e.g. by having configured a low >>>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the >>>>>> dead >>>>>> TaskManager until it is marked as dead and its slots are released (unless >>>>>> the ResourceManager does not get a signal from the underlying resource >>>>>> management system that a container/pod has died). One way to improve the >>>>>> situation is to introduce logic which can react to a ConnectionException >>>>>> and then black lists or releases a TaskManager, for example. This is >>>>>> currently not implemented in Flink, though. >>>>>> >>>>>> Concerning the cancellation operation: Flink currently does not >>>>>> listen to the dead letters of Akka. This means that the >>>>>> `akka.ask.timeout` >>>>>> is the primary means to fail the future result of a rpc which could not >>>>>> be >>>>>> sent. This is also an improvement we should add to Flink's RpcService. >>>>>> I've >>>>>> created a JIRA issue for this problem [1]. >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202 >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote: >>>>>> >>>>>>> Thanks Gen! cc flink-dev to collect more inputs. >>>>>>> >>>>>>> Best >>>>>>> Lu >>>>>>> >>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I'm also wondering here. >>>>>>>> >>>>>>>> In my opinion, it's because the JM can not confirm whether the TM >>>>>>>> is lost or it's a temporary network trouble and will recover soon, >>>>>>>> since >>>>>>>> I can see in the log that akka has got a Connection refused but JM >>>>>>>> still >>>>>>>> sends a heartbeat request to the lost TM until it reaches heartbeat >>>>>>>> timeout. But I'm not sure if it's indeed designed like this. >>>>>>>> >>>>>>>> I would really appreciate it if anyone who knows more details could >>>>>>>> answer. Thanks. >>>>>>>> >>>>>>>