Thanks TIll and Yang for help! Also Thanks Till for a quick fix! I did another test yesterday. In this test, I intentionally throw exception from the source operator: ``` if (runtimeContext.getIndexOfThisSubtask() == 1 && errorFrenquecyInMin > 0 && System.currentTimeMillis() - lastStartTime >= errorFrenquecyInMin * 60 * 1000) { lastStartTime = System.currentTimeMillis(); throw new RuntimeException( "Trigger expected exception at: " + lastStartTime); } ``` In this case, I found phase 1 still takes about 30s and Phase 2 dropped to 1s (because no need for container allocation). Why phase 1 still takes 30s even though no TM is lost?
Related logs: ``` 2021-06-30 00:55:07,463 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ... java.lang.RuntimeException: Trigger expected exception at: 1625014507446 2021-06-30 00:55:07,509 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to RESTARTING. 2021-06-30 00:55:37,596 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to RUNNING. 2021-06-30 00:55:38,678 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph (time when all tasks switch from CREATED to RUNNING) ``` Best Lu On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote: > Thanks TIll and Yang for help! Also Thanks Till for a quick fix! > > I did another test yesterday. In this test, I intentionally throw > exception from the source operator: > ``` > if (runtimeContext.getIndexOfThisSubtask() == 1 > && errorFrenquecyInMin > 0 > && System.currentTimeMillis() - lastStartTime >= > errorFrenquecyInMin * 60 * 1000) { > lastStartTime = System.currentTimeMillis(); > throw new RuntimeException( > "Trigger expected exception at: " + lastStartTime); > } > ``` > In this case, I found phase 1 still takes about 30s and Phase 2 dropped to > 1s (because no need for container allocation). > > Some logs: > ``` > ``` > > > On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org> wrote: > >> A quick addition, I think with FLINK-23202 it should now also be possible >> to improve the heartbeat mechanism in the general case. We can leverage the >> unreachability exception thrown if a remote target is no longer reachable >> to mark an heartbeat target as no longer reachable [1]. This can then be >> considered as if the heartbeat timeout has been triggered. That way we >> should detect lost TaskExecutors as fast as our heartbeat interval is. >> >> [1] https://issues.apache.org/jira/browse/FLINK-23209 >> >> Cheers, >> Till >> >> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote: >> >>> Since you are deploying Flink workloads on Yarn, the Flink >>> ResourceManager should get the container >>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, >>> which is 8 seconds by default. >>> And Flink ResourceManager will release the dead TaskManager container >>> once received the completion event. >>> As a result, Flink will not deploy tasks onto the dead TaskManagers. >>> >>> >>> I think most of the time cost in Phase 1 might be cancelling the tasks >>> on the dead TaskManagers. >>> >>> >>> Best, >>> Yang >>> >>> >>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道: >>> >>>> The analysis of Gen is correct. Flink currently uses its heartbeat as >>>> the primary means to detect dead TaskManagers. This means that Flink will >>>> take at least `heartbeat.timeout` time before the system recovers. Even if >>>> the cancellation happens fast (e.g. by having configured a low >>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead >>>> TaskManager until it is marked as dead and its slots are released (unless >>>> the ResourceManager does not get a signal from the underlying resource >>>> management system that a container/pod has died). One way to improve the >>>> situation is to introduce logic which can react to a ConnectionException >>>> and then black lists or releases a TaskManager, for example. This is >>>> currently not implemented in Flink, though. >>>> >>>> Concerning the cancellation operation: Flink currently does not listen >>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the >>>> primary means to fail the future result of a rpc which could not be sent. >>>> This is also an improvement we should add to Flink's RpcService. I've >>>> created a JIRA issue for this problem [1]. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-23202 >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote: >>>> >>>>> Thanks Gen! cc flink-dev to collect more inputs. >>>>> >>>>> Best >>>>> Lu >>>>> >>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote: >>>>> >>>>>> I'm also wondering here. >>>>>> >>>>>> In my opinion, it's because the JM can not confirm whether the TM is >>>>>> lost or it's a temporary network trouble and will recover soon, since I >>>>>> can >>>>>> see in the log that akka has got a Connection refused but JM still sends >>>>>> a >>>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But >>>>>> I'm not sure if it's indeed designed like this. >>>>>> >>>>>> I would really appreciate it if anyone who knows more details could >>>>>> answer. Thanks. >>>>>> >>>>>