A quick addition, I think with FLINK-23202 it should now also be possible to improve the heartbeat mechanism in the general case. We can leverage the unreachability exception thrown if a remote target is no longer reachable to mark an heartbeat target as no longer reachable [1]. This can then be considered as if the heartbeat timeout has been triggered. That way we should detect lost TaskExecutors as fast as our heartbeat interval is.
[1] https://issues.apache.org/jira/browse/FLINK-23209 Cheers, Till On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote: > Since you are deploying Flink workloads on Yarn, the Flink ResourceManager > should get the container > completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which > is 8 seconds by default. > And Flink ResourceManager will release the dead TaskManager container once > received the completion event. > As a result, Flink will not deploy tasks onto the dead TaskManagers. > > > I think most of the time cost in Phase 1 might be cancelling the tasks on > the dead TaskManagers. > > > Best, > Yang > > > Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道: > >> The analysis of Gen is correct. Flink currently uses its heartbeat as the >> primary means to detect dead TaskManagers. This means that Flink will take >> at least `heartbeat.timeout` time before the system recovers. Even if the >> cancellation happens fast (e.g. by having configured a low >> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead >> TaskManager until it is marked as dead and its slots are released (unless >> the ResourceManager does not get a signal from the underlying resource >> management system that a container/pod has died). One way to improve the >> situation is to introduce logic which can react to a ConnectionException >> and then black lists or releases a TaskManager, for example. This is >> currently not implemented in Flink, though. >> >> Concerning the cancellation operation: Flink currently does not listen to >> the dead letters of Akka. This means that the `akka.ask.timeout` is the >> primary means to fail the future result of a rpc which could not be sent. >> This is also an improvement we should add to Flink's RpcService. I've >> created a JIRA issue for this problem [1]. >> >> [1] https://issues.apache.org/jira/browse/FLINK-23202 >> >> Cheers, >> Till >> >> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote: >> >>> Thanks Gen! cc flink-dev to collect more inputs. >>> >>> Best >>> Lu >>> >>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote: >>> >>>> I'm also wondering here. >>>> >>>> In my opinion, it's because the JM can not confirm whether the TM is >>>> lost or it's a temporary network trouble and will recover soon, since I can >>>> see in the log that akka has got a Connection refused but JM still sends a >>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But >>>> I'm not sure if it's indeed designed like this. >>>> >>>> I would really appreciate it if anyone who knows more details could >>>> answer. Thanks. >>>> >>>