Yes, I have noticed the PR and commented there with some consideration about the new option. We can discuss further there.
On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann <trohrm...@apache.org> wrote: > This is actually a very good point Gen. There might not be a lot to gain > for us by implementing a fancy algorithm for figuring out whether a TM is > dead or not based on failed heartbeat RPCs from the JM if the TM <> TM > communication does not tolerate failures and directly fails the affected > tasks. This assumes that the JM and TM run in the same environment. > > One simple approach could be to make the number of failed heartbeat RPCs > until a target is marked as unreachable configurable because what > represents a good enough criterion in one user's environment might produce > too many false-positives in somebody else's environment. Or even simpler, > one could say that one can disable reacting to a failed heartbeat RPC as it > is currently the case. > > We currently have a discussion about this on this PR [1]. Maybe you wanna > join the discussion there and share your insights. > > [1] https://github.com/apache/flink/pull/16357 > > Cheers, > Till > > On Tue, Jul 6, 2021 at 4:37 AM Gen Luo <luogen...@gmail.com> wrote: > >> I know that there are retry strategies for akka rpc frameworks. I was >> just considering that, since the environment is shared by JM and TMs, and >> the connections among TMs (using netty) are flaky in unstable environments, >> which will also cause the job failure, is it necessary to build a >> strongly guaranteed connection between JM and TMs, or it could be as flaky >> as the connections among TMs? >> >> As far as I know, connections among TMs will just fail on their first >> connection loss, so behaving like this in JM just means "as flaky as >> connections among TMs". In a stable environment it's good enough, but in an >> unstable environment, it indeed increases the instability. IMO, though a >> single connection loss is not reliable, a double check should be good >> enough. But since I'm not experienced with an unstable environment, I can't >> tell whether that's also enough for it. >> >> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> I think for RPC communication there are retry strategies used by the >>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote >>> ActorSystem and resume communication. Moreover, there are also >>> reconciliation protocols in place which reconcile the states between the >>> components because of potentially lost RPC messages. So the main question >>> would be whether a single connection loss is good enough for triggering the >>> timeout or whether we want a more elaborate mechanism to reason about the >>> availability of the remote system (e.g. a couple of lost heartbeat >>> messages). >>> >>> Cheers, >>> Till >>> >>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo <luogen...@gmail.com> wrote: >>> >>>> As far as I know, a TM will report connection failure once its >>>> connected TM is lost. I suppose JM can believe the report and fail the >>>> tasks in the lost TM if it also encounters a connection failure. >>>> >>>> Of course, it won't work if the lost TM is standalone. But I suppose we >>>> can use the same strategy as the connected scenario. That is, consider it >>>> possibly lost on the first connection loss, and fail it if double check >>>> also fails. The major difference is the senders of the probes are the same >>>> one rather than two different roles, so the results may tend to be the >>>> same. >>>> >>>> On the other hand, the fact also means that the jobs can be fragile in >>>> an unstable environment, no matter whether the failover is triggered by TM >>>> or JM. So maybe it's not that worthy to introduce extra configurations for >>>> fault tolerance of heartbeat, unless we also introduce some retry >>>> strategies for netty connections. >>>> >>>> >>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Could you share the full logs with us for the second experiment, Lu? I >>>>> cannot tell from the top of my head why it should take 30s unless you have >>>>> configured a restart delay of 30s. >>>>> >>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen. >>>>> >>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem >>>>> that in a flakey environment you might not want to mark a TaskExecutor >>>>> dead >>>>> on the first connection loss. Maybe this is something we need to make >>>>> configurable (e.g. introducing a threshold which admittedly is similar to >>>>> the heartbeat timeout) so that the user can configure it for her >>>>> environment. On the upside, if you mark the TaskExecutor dead on the first >>>>> connection loss (assuming you have a stable network environment), then it >>>>> can now detect lost TaskExecutors as fast as the heartbeat interval. >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209 >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo <luogen...@gmail.com> wrote: >>>>> >>>>>> Thanks for sharing, Till and Yang. >>>>>> >>>>>> @Lu >>>>>> Sorry but I don't know how to explain the new test with the log. >>>>>> Let's wait for others' reply. >>>>>> >>>>>> @Till >>>>>> It would be nice if JIRAs could be fixed. Thanks again for proposing >>>>>> them. >>>>>> >>>>>> In addition, I was tracking an issue that RM keeps allocating and >>>>>> freeing slots after a TM lost until its heartbeat timeout, when I found >>>>>> the >>>>>> recovery costing as long as heartbeat timeout. That should be a minor bug >>>>>> introduced by declarative resource management. I have created a JIRA >>>>>> about >>>>>> the problem [1] and we can discuss it there if necessary. >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216 >>>>>> >>>>>> Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道: >>>>>> >>>>>>> Another side question, Shall we add metric to cover the complete >>>>>>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime >>>>>>> only >>>>>>> covers phase 1. Thanks! >>>>>>> >>>>>>> Best >>>>>>> Lu >>>>>>> >>>>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >>>>>>>> >>>>>>>> I did another test yesterday. In this test, I intentionally throw >>>>>>>> exception from the source operator: >>>>>>>> ``` >>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1 >>>>>>>> && errorFrenquecyInMin > 0 >>>>>>>> && System.currentTimeMillis() - lastStartTime >= >>>>>>>> errorFrenquecyInMin * 60 * 1000) { >>>>>>>> lastStartTime = System.currentTimeMillis(); >>>>>>>> throw new RuntimeException( >>>>>>>> "Trigger expected exception at: " + lastStartTime); >>>>>>>> } >>>>>>>> ``` >>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2 >>>>>>>> dropped to 1s (because no need for container allocation). Why phase 1 >>>>>>>> still >>>>>>>> takes 30s even though no TM is lost? >>>>>>>> >>>>>>>> Related logs: >>>>>>>> ``` >>>>>>>> 2021-06-30 00:55:07,463 INFO >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>>>>>> Source: >>>>>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> >>>>>>>> ... >>>>>>>> java.lang.RuntimeException: Trigger expected exception at: >>>>>>>> 1625014507446 >>>>>>>> 2021-06-30 00:55:07,509 INFO >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to >>>>>>>> RESTARTING. >>>>>>>> 2021-06-30 00:55:37,596 INFO >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to >>>>>>>> RUNNING. >>>>>>>> 2021-06-30 00:55:38,678 INFO >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph (time >>>>>>>> when >>>>>>>> all tasks switch from CREATED to RUNNING) >>>>>>>> ``` >>>>>>>> Best >>>>>>>> Lu >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >>>>>>>>> >>>>>>>>> I did another test yesterday. In this test, I intentionally throw >>>>>>>>> exception from the source operator: >>>>>>>>> ``` >>>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1 >>>>>>>>> && errorFrenquecyInMin > 0 >>>>>>>>> && System.currentTimeMillis() - lastStartTime >= >>>>>>>>> errorFrenquecyInMin * 60 * 1000) { >>>>>>>>> lastStartTime = System.currentTimeMillis(); >>>>>>>>> throw new RuntimeException( >>>>>>>>> "Trigger expected exception at: " + lastStartTime); >>>>>>>>> } >>>>>>>>> ``` >>>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2 >>>>>>>>> dropped to 1s (because no need for container allocation). >>>>>>>>> >>>>>>>>> Some logs: >>>>>>>>> ``` >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> A quick addition, I think with FLINK-23202 it should now also be >>>>>>>>>> possible to improve the heartbeat mechanism in the general case. We >>>>>>>>>> can >>>>>>>>>> leverage the unreachability exception thrown if a remote target is no >>>>>>>>>> longer reachable to mark an heartbeat target as no longer reachable >>>>>>>>>> [1]. >>>>>>>>>> This can then be considered as if the heartbeat timeout has been >>>>>>>>>> triggered. >>>>>>>>>> That way we should detect lost TaskExecutors as fast as our heartbeat >>>>>>>>>> interval is. >>>>>>>>>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209 >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Since you are deploying Flink workloads on Yarn, the Flink >>>>>>>>>>> ResourceManager should get the container >>>>>>>>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink >>>>>>>>>>> RM, which is 8 seconds by default. >>>>>>>>>>> And Flink ResourceManager will release the dead TaskManager >>>>>>>>>>> container once received the completion event. >>>>>>>>>>> As a result, Flink will not deploy tasks onto the dead >>>>>>>>>>> TaskManagers. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I think most of the time cost in Phase 1 might be cancelling the >>>>>>>>>>> tasks on the dead TaskManagers. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Yang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道: >>>>>>>>>>> >>>>>>>>>>>> The analysis of Gen is correct. Flink currently uses its >>>>>>>>>>>> heartbeat as the primary means to detect dead TaskManagers. This >>>>>>>>>>>> means that >>>>>>>>>>>> Flink will take at least `heartbeat.timeout` time before the system >>>>>>>>>>>> recovers. Even if the cancellation happens fast (e.g. by having >>>>>>>>>>>> configured >>>>>>>>>>>> a low akka.ask.timeout), then Flink will still try to deploy tasks >>>>>>>>>>>> onto the >>>>>>>>>>>> dead TaskManager until it is marked as dead and its slots are >>>>>>>>>>>> released >>>>>>>>>>>> (unless the ResourceManager does not get a signal from the >>>>>>>>>>>> underlying >>>>>>>>>>>> resource management system that a container/pod has died). One way >>>>>>>>>>>> to >>>>>>>>>>>> improve the situation is to introduce logic which can react to a >>>>>>>>>>>> ConnectionException and then black lists or releases a >>>>>>>>>>>> TaskManager, for >>>>>>>>>>>> example. This is currently not implemented in Flink, though. >>>>>>>>>>>> >>>>>>>>>>>> Concerning the cancellation operation: Flink currently does not >>>>>>>>>>>> listen to the dead letters of Akka. This means that the >>>>>>>>>>>> `akka.ask.timeout` >>>>>>>>>>>> is the primary means to fail the future result of a rpc which >>>>>>>>>>>> could not be >>>>>>>>>>>> sent. This is also an improvement we should add to Flink's >>>>>>>>>>>> RpcService. I've >>>>>>>>>>>> created a JIRA issue for this problem [1]. >>>>>>>>>>>> >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202 >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Till >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Gen! cc flink-dev to collect more inputs. >>>>>>>>>>>>> >>>>>>>>>>>>> Best >>>>>>>>>>>>> Lu >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I'm also wondering here. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In my opinion, it's because the JM can not confirm whether >>>>>>>>>>>>>> the TM is lost or it's a temporary network trouble and will >>>>>>>>>>>>>> recover soon, >>>>>>>>>>>>>> since I can see in the log that akka has got a Connection >>>>>>>>>>>>>> refused but JM >>>>>>>>>>>>>> still sends a heartbeat request to the lost TM until it reaches >>>>>>>>>>>>>> heartbeat >>>>>>>>>>>>>> timeout. But I'm not sure if it's indeed designed like this. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I would really appreciate it if anyone who knows more details >>>>>>>>>>>>>> could answer. Thanks. >>>>>>>>>>>>>> >>>>>>>>>>>>>