Hi Vishal, you should not need to configure anything else.
Cheers, Till On Sat, Jun 30, 2018 at 7:23 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > A clarification.. In 1.5 with custom heartbeats are there additional > configurations we should be concerned about ? > > On Fri, May 25, 2018 at 10:17 AM, Steven Wu <stevenz...@gmail.com> wrote: > >> Till, thanks for the follow-up. looking forward to 1.5 :) >> >> On Fri, May 25, 2018 at 2:11 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Steven, >>> >>> we don't have `jobmanager.exit-on-fatal-akka-error` because then the JM >>> would also be killed if a single TM gets quarantined. This is also not a >>> desired behaviour. >>> >>> With Flink 1.5 the problem with quarantining should be gone since we >>> don't rely anymore on Akka's death watch and instead use our own heartbeats. >>> >>> Cheers, >>> Till >>> >>> On Mon, May 14, 2018 at 1:07 AM, Steven Wu <stevenz...@gmail.com> wrote: >>> >>>> Till, >>>> >>>> thanks for the clarification. yes, that situation is undesirable either. >>>> >>>> In our case, restarting jobmanager could also recover the job from akk >>>> association lock-out. it was actually the issue (high GC pause) on >>>> jobmanager side that caused the akka failure. >>>> >>>> do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does >>>> it make sense to terminate jobmanager in this case? >>>> >>>> Thanks, >>>> Steven >>>> >>>> On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Steven, >>>>> >>>>> the reason why we did not turn on this feature per default was that in >>>>> case of a true JM failure, all of the TMs will think that they got >>>>> quarantined which triggers their shut down. Depending on how many >>>>> container >>>>> restarts you have left on Yarn, for example, this can lead to a situation >>>>> where Flink is not able to recover the job even though it needed to only >>>>> restart the JM container. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com> >>>>> wrote: >>>>> >>>>>> Till, >>>>>> >>>>>> We ran into the same issue. It started with high GC pause that caused >>>>>> jobmanager to lose zk conn and leadership and caused jobmanager to >>>>>> quarantine taskmanager in akka. Once quarantined, akka association btw >>>>>> jobmanager and taskmanager is locked forever. >>>>>> >>>>>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true" >>>>>> worked. taskmanager exited and replacement taskmanager joined the cluster >>>>>> afterwards. I am wondering why is this not defaulted to "true". Any >>>>>> downside? >>>>>> >>>>>> Thanks, >>>>>> Steven >>>>>> >>>>>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com> >>>>>> wrote: >>>>>> >>>>>>> @Jelmer, this is Till's las response on the issue. >>>>>>> >>>>>>> -- Ashish >>>>>>> >>>>>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann >>>>>>> <trohrm...@apache.org> wrote: >>>>>>> Hi, >>>>>>> >>>>>>> this sounds like a serious regression wrt Flink 1.3.2 and we should >>>>>>> definitely find out what's causing this problem. Given from what I see >>>>>>> in >>>>>>> the logs, the following happens: >>>>>>> >>>>>>> For some time the JobManager seems to no longer receive heartbeats >>>>>>> from the TaskManager. This could be, for example, due to long GC pauses >>>>>>> or >>>>>>> heavy load which starves the ActorSystem's threads which are responsible >>>>>>> for sending the heartbeats. Due to this, the TM's ActorSystem is >>>>>>> quarantined which effectively renders them useless because the JM will >>>>>>> henceforth ignore all messages from these systems. The only way to >>>>>>> resolve >>>>>>> this problem is to restart the ActorSystem. By >>>>>>> setting taskmanager.exit-on-fatal-akka-error to true in >>>>>>> flink-conf.yaml, a >>>>>>> quarantined TM will shut down. If you run the Flink cluster on Yarn, >>>>>>> then a >>>>>>> new substitute TM will be started if you have still some container >>>>>>> restarts >>>>>>> left. That way, the system should be able to recover. >>>>>>> >>>>>>> Additionally you could try to play around >>>>>>> with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause which >>>>>>> control the heartbeat interval and the acceptable pause. By increasing >>>>>>> the >>>>>>> latter, the system should tolerate longer GC pauses and period of high >>>>>>> load. >>>>>>> >>>>>>> However, this only addresses the symptoms of the problem and I'd >>>>>>> like to find out what's causing the problem. In order to further debug >>>>>>> the >>>>>>> problem, it would be really helpful to obtain the logs of the JobManager >>>>>>> and the TaskManagers on DEBUG log level and >>>>>>> with taskmanager.debug.memory.startLogThread set to true. Additionally >>>>>>> it >>>>>>> would be interesting to see whats happening on the TaskManagers when you >>>>>>> observe high load. So obtaining a profiler dump via VisualVM would be >>>>>>> great. And last but not least, it also helps to learn more about the job >>>>>>> you're running. What kind of connectors is it using? Are you using >>>>>>> Flink's >>>>>>> metric system? How is the Flink cluster deployed? Which other libraries >>>>>>> are >>>>>>> you using in your job? >>>>>>> >>>>>>> Thanks a lot for your help! >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> I've seen a similar issue while running successive Flink SQL batches >>>>>>> on 1.4. In my case, the Job Manager would fail with the log output about >>>>>>> unreachability (with an additional statement about something going >>>>>>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where >>>>>>> everything works perfectly, but we will try again soon on 1.4. When we >>>>>>> do I >>>>>>> will post the actual log output. >>>>>>> >>>>>>> This was on YARN in AWS, with akka.ask.timeout = 60s. >>>>>>> >>>>>>> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel < >>>>>>> ashish...@yahoo.com> wrote: >>>>>>> >>>>>>> I haven’t gotten much further with this. It doesn’t look like GC >>>>>>> related - at least GC counters were not that atrocious. However, my main >>>>>>> concern was once the load subsides why aren’t TM and JM connecting >>>>>>> again? >>>>>>> That doesn’t look normal. I could definitely tell JM was listening on >>>>>>> the >>>>>>> port and from logs it does appear TM is trying to message JM that is >>>>>>> still >>>>>>> alive. >>>>>>> >>>>>>> Thanks, Ashish >>>>>>> >>>>>>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard < >>>>>>> lassenederga...@gmail.com> wrote: >>>>>>> >>>>>>> Hi. >>>>>>> >>>>>>> Did you find a reason for the detaching ? >>>>>>> I sometimes see the same on our system running Flink 1.4 on dc/os. I >>>>>>> have enabled taskmanager.Debug.memory.start logthread for debugging. >>>>>>> >>>>>>> Med venlig hilsen / Best regards >>>>>>> Lasse Nedergaard >>>>>>> >>>>>>> >>>>>>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong < >>>>>>> duckientru...@gmail.com>: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> You should enable and check your garbage collection log. >>>>>>> >>>>>>> We've encountered case where Task Manager disassociated due to long >>>>>>> GC pause. >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Kien >>>>>>> On 1/20/2018 1:27 AM, ashish pok wrote: >>>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> We have hit some load related issues and was wondering if any one >>>>>>> has some suggestions. We are noticing task managers and job managers >>>>>>> being >>>>>>> detached from each other under load and never really sync up again. As a >>>>>>> result, Flink session shows 0 slots available for processing. Even >>>>>>> though, >>>>>>> apps are configured to restart it isn't really helping as there are no >>>>>>> slots available to run the apps. >>>>>>> >>>>>>> >>>>>>> Here are excerpt from logs that seemed relevant. (I am trimming out >>>>>>> rest of the logs for brevity) >>>>>>> >>>>>>> *Job Manager:* >>>>>>> 2018-01-19 12:38:00,423 INFO org.apache.flink.runtime.jobma >>>>>>> nager.JobManager - Starting JobManager (Version: 1.4.0, >>>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>>>>>> >>>>>>> 2018-01-19 12:38:00,792 INFO org.apache.flink.runtime.jobma >>>>>>> nager.JobManager - Maximum heap size: 16384 MiBytes >>>>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>>>> nager.JobManager - Hadoop version: 2.6.5 >>>>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>>>> nager.JobManager - JVM Options: >>>>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>>>> nager.JobManager - -Xms16384m >>>>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>>>> nager.JobManager - -Xmx16384m >>>>>>> 2018-01-19 12:38:00,795 INFO org.apache.flink.runtime.jobma >>>>>>> nager.JobManager - -XX:+UseG1GC >>>>>>> >>>>>>> 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration >>>>>>> .GlobalConfiguration - Loading configuration property: >>>>>>> jobmanager.rpc.port, 6123 >>>>>>> 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration >>>>>>> .GlobalConfiguration - Loading configuration property: >>>>>>> jobmanager.heap.mb, 16384 >>>>>>> >>>>>>> >>>>>>> 2018-01-19 12:53:34,671 WARN akka.remote.RemoteWatcher >>>>>>> - Detected unreachable: >>>>>>> [akka.tcp://flink@<jm-host>:37 >>>>>>> 840] >>>>>>> 2018-01-19 12:53:34,676 INFO org.apache.flink.runtime.jobma >>>>>>> nager.JobManager - Task manager >>>>>>> akka.tcp://flink@<jm-host>:378 >>>>>>> 40/user/taskmanager terminated. >>>>>>> >>>>>>> -- So once Flink session boots up, we are hitting it with pretty >>>>>>> heavy load, which typically results in the WARN above >>>>>>> >>>>>>> *Task Manager:* >>>>>>> 2018-01-19 12:38:01,002 INFO org.apache.flink.runtime.taskm >>>>>>> anager.TaskManager - Starting TaskManager (Version: 1.4.0, >>>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>>> anager.TaskManager - Hadoop version: 2.6.5 >>>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>>> anager.TaskManager - JVM Options: >>>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>>> anager.TaskManager - -Xms16384M >>>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>>> anager.TaskManager - -Xmx16384M >>>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>>> anager.TaskManager - -XX:MaxDirectMemorySize=83886 07T >>>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>>> anager.TaskManager - -XX:+UseG1GC >>>>>>> >>>>>>> 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration >>>>>>> .GlobalConfiguration - Loading configuration property: >>>>>>> jobmanager.rpc.port, 6123 >>>>>>> 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration >>>>>>> .GlobalConfiguration - Loading configuration property: >>>>>>> jobmanager.heap.mb, 16384 >>>>>>> >>>>>>> >>>>>>> 2018-01-19 12:54:48,626 WARN akka.remote.RemoteWatcher >>>>>>> - Detected unreachable: >>>>>>> [akka.tcp://flink@<jm-host>:61 >>>>>>> 23] >>>>>>> 2018-01-19 12:54:48,690 INFO akka.remote.Remoting >>>>>>> - Quarantined address >>>>>>> [akka.tcp://flink@<jm-host>:61 >>>>>>> 23] is still unreachable or has not been restarted. Keeping it >>>>>>> quarantined. >>>>>>> 018-01-19 12:54:48,774 WARN akka.remote.Remoting >>>>>>> - Tried to associate with unreachable remote >>>>>>> address [akka.tcp://flink@<tm-host>:61 >>>>>>> 23]. Address is now gated for 5000 ms, all messages to this address >>>>>>> will be delivered to dead letters. Reason: [The remote system has a UID >>>>>>> that has been quarantined. Association aborted.] >>>>>>> 2018-01-19 12:54:48,833 WARN akka.remote.Remoting >>>>>>> - Tried to associate with unreachable remote >>>>>>> address [akka.tcp://flink@<tm-host>:61 23]. Address is now gated >>>>>>> for 5000 ms, all messages to this address will be delivered to dead >>>>>>> letters. Reason: [The remote system has quarantined this system. No >>>>>>> further >>>>>>> associations to the remote system are possible until this system is >>>>>>> restarted.] >>>>>>> <bunch of ERRORs on operations not shutdown properly - assuming >>>>>>> because JM is unreachable> >>>>>>> >>>>>>> 2018-01-19 12:56:51,244 INFO org.apache.flink.runtime.taskm >>>>>>> anager.TaskManager - Trying to register at JobManager >>>>>>> akka.tcp://flink@<jm-host>:612 >>>>>>> 3/user/jobmanager (attempt 10, timeout: 30000 milliseconds) >>>>>>> 2018-01-19 12:56:51,253 WARN akka.remote.Remoting >>>>>>> - Tried to associate with unreachable remote >>>>>>> address [akka.tcp://flink@<jm-host>:61 23]. Address is now gated >>>>>>> for 5000 ms, all messages to this address will be delivered to dead >>>>>>> letters. Reason: [The remote system has quarantined this system. No >>>>>>> further >>>>>>> associations to the remote system are possible until this system is >>>>>>> restarted.] >>>>>>> >>>>>>> So bottom line is, JM and TM couldn't communicate under load, which >>>>>>> is obviously not good. I tried to bump up akka.tcp.timeout as well but >>>>>>> it >>>>>>> didnt help either. So my question here is after all processing is halted >>>>>>> and there is no new data being picked up, shouldn't this environment >>>>>>> self-heal? Any other things I can be looking at other than extending >>>>>>> timeouts? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Ashish >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >