A clarification.. In 1.5 with custom heartbeats are there additional configurations we should be concerned about ?
On Fri, May 25, 2018 at 10:17 AM, Steven Wu <stevenz...@gmail.com> wrote: > Till, thanks for the follow-up. looking forward to 1.5 :) > > On Fri, May 25, 2018 at 2:11 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Steven, >> >> we don't have `jobmanager.exit-on-fatal-akka-error` because then the JM >> would also be killed if a single TM gets quarantined. This is also not a >> desired behaviour. >> >> With Flink 1.5 the problem with quarantining should be gone since we >> don't rely anymore on Akka's death watch and instead use our own heartbeats. >> >> Cheers, >> Till >> >> On Mon, May 14, 2018 at 1:07 AM, Steven Wu <stevenz...@gmail.com> wrote: >> >>> Till, >>> >>> thanks for the clarification. yes, that situation is undesirable either. >>> >>> In our case, restarting jobmanager could also recover the job from akk >>> association lock-out. it was actually the issue (high GC pause) on >>> jobmanager side that caused the akka failure. >>> >>> do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does >>> it make sense to terminate jobmanager in this case? >>> >>> Thanks, >>> Steven >>> >>> On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Steven, >>>> >>>> the reason why we did not turn on this feature per default was that in >>>> case of a true JM failure, all of the TMs will think that they got >>>> quarantined which triggers their shut down. Depending on how many container >>>> restarts you have left on Yarn, for example, this can lead to a situation >>>> where Flink is not able to recover the job even though it needed to only >>>> restart the JM container. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com> >>>> wrote: >>>> >>>>> Till, >>>>> >>>>> We ran into the same issue. It started with high GC pause that caused >>>>> jobmanager to lose zk conn and leadership and caused jobmanager to >>>>> quarantine taskmanager in akka. Once quarantined, akka association btw >>>>> jobmanager and taskmanager is locked forever. >>>>> >>>>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true" >>>>> worked. taskmanager exited and replacement taskmanager joined the cluster >>>>> afterwards. I am wondering why is this not defaulted to "true". Any >>>>> downside? >>>>> >>>>> Thanks, >>>>> Steven >>>>> >>>>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com> >>>>> wrote: >>>>> >>>>>> @Jelmer, this is Till's las response on the issue. >>>>>> >>>>>> -- Ashish >>>>>> >>>>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann >>>>>> <trohrm...@apache.org> wrote: >>>>>> Hi, >>>>>> >>>>>> this sounds like a serious regression wrt Flink 1.3.2 and we should >>>>>> definitely find out what's causing this problem. Given from what I see in >>>>>> the logs, the following happens: >>>>>> >>>>>> For some time the JobManager seems to no longer receive heartbeats >>>>>> from the TaskManager. This could be, for example, due to long GC pauses >>>>>> or >>>>>> heavy load which starves the ActorSystem's threads which are responsible >>>>>> for sending the heartbeats. Due to this, the TM's ActorSystem is >>>>>> quarantined which effectively renders them useless because the JM will >>>>>> henceforth ignore all messages from these systems. The only way to >>>>>> resolve >>>>>> this problem is to restart the ActorSystem. By >>>>>> setting taskmanager.exit-on-fatal-akka-error to true in >>>>>> flink-conf.yaml, a quarantined TM will shut down. If you run the Flink >>>>>> cluster on Yarn, then a new substitute TM will be started if you have >>>>>> still >>>>>> some container restarts left. That way, the system should be able to >>>>>> recover. >>>>>> >>>>>> Additionally you could try to play around >>>>>> with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause >>>>>> which control the heartbeat interval and the acceptable pause. By >>>>>> increasing the latter, the system should tolerate longer GC pauses and >>>>>> period of high load. >>>>>> >>>>>> However, this only addresses the symptoms of the problem and I'd like >>>>>> to find out what's causing the problem. In order to further debug the >>>>>> problem, it would be really helpful to obtain the logs of the JobManager >>>>>> and the TaskManagers on DEBUG log level and with >>>>>> taskmanager.debug.memory.startLogThread >>>>>> set to true. Additionally it would be interesting to see whats happening >>>>>> on >>>>>> the TaskManagers when you observe high load. So obtaining a profiler dump >>>>>> via VisualVM would be great. And last but not least, it also helps to >>>>>> learn >>>>>> more about the job you're running. What kind of connectors is it using? >>>>>> Are >>>>>> you using Flink's metric system? How is the Flink cluster deployed? Which >>>>>> other libraries are you using in your job? >>>>>> >>>>>> Thanks a lot for your help! >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com> >>>>>> wrote: >>>>>> >>>>>> I've seen a similar issue while running successive Flink SQL batches >>>>>> on 1.4. In my case, the Job Manager would fail with the log output about >>>>>> unreachability (with an additional statement about something going >>>>>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where >>>>>> everything works perfectly, but we will try again soon on 1.4. When we >>>>>> do I >>>>>> will post the actual log output. >>>>>> >>>>>> This was on YARN in AWS, with akka.ask.timeout = 60s. >>>>>> >>>>>> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashish...@yahoo.com >>>>>> > wrote: >>>>>> >>>>>> I haven’t gotten much further with this. It doesn’t look like GC >>>>>> related - at least GC counters were not that atrocious. However, my main >>>>>> concern was once the load subsides why aren’t TM and JM connecting again? >>>>>> That doesn’t look normal. I could definitely tell JM was listening on the >>>>>> port and from logs it does appear TM is trying to message JM that is >>>>>> still >>>>>> alive. >>>>>> >>>>>> Thanks, Ashish >>>>>> >>>>>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard < >>>>>> lassenederga...@gmail.com> wrote: >>>>>> >>>>>> Hi. >>>>>> >>>>>> Did you find a reason for the detaching ? >>>>>> I sometimes see the same on our system running Flink 1.4 on dc/os. I >>>>>> have enabled taskmanager.Debug.memory.start logthread for debugging. >>>>>> >>>>>> Med venlig hilsen / Best regards >>>>>> Lasse Nedergaard >>>>>> >>>>>> >>>>>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong < >>>>>> duckientru...@gmail.com>: >>>>>> >>>>>> Hi, >>>>>> >>>>>> You should enable and check your garbage collection log. >>>>>> >>>>>> We've encountered case where Task Manager disassociated due to long >>>>>> GC pause. >>>>>> >>>>>> >>>>>> Regards, >>>>>> >>>>>> Kien >>>>>> On 1/20/2018 1:27 AM, ashish pok wrote: >>>>>> >>>>>> Hi All, >>>>>> >>>>>> We have hit some load related issues and was wondering if any one has >>>>>> some suggestions. We are noticing task managers and job managers being >>>>>> detached from each other under load and never really sync up again. As a >>>>>> result, Flink session shows 0 slots available for processing. Even >>>>>> though, >>>>>> apps are configured to restart it isn't really helping as there are no >>>>>> slots available to run the apps. >>>>>> >>>>>> >>>>>> Here are excerpt from logs that seemed relevant. (I am trimming out >>>>>> rest of the logs for brevity) >>>>>> >>>>>> *Job Manager:* >>>>>> 2018-01-19 12:38:00,423 INFO org.apache.flink.runtime.jobma >>>>>> nager.JobManager - Starting JobManager (Version: 1.4.0, >>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>>>>> >>>>>> 2018-01-19 12:38:00,792 INFO org.apache.flink.runtime.jobma >>>>>> nager.JobManager - Maximum heap size: 16384 MiBytes >>>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>>> nager.JobManager - Hadoop version: 2.6.5 >>>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>>> nager.JobManager - JVM Options: >>>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>>> nager.JobManager - -Xms16384m >>>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>>> nager.JobManager - -Xmx16384m >>>>>> 2018-01-19 12:38:00,795 INFO org.apache.flink.runtime.jobma >>>>>> nager.JobManager - -XX:+UseG1GC >>>>>> >>>>>> 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration >>>>>> .GlobalConfiguration - Loading configuration property: >>>>>> jobmanager.rpc.port, 6123 >>>>>> 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration >>>>>> .GlobalConfiguration - Loading configuration property: >>>>>> jobmanager.heap.mb, 16384 >>>>>> >>>>>> >>>>>> 2018-01-19 12:53:34,671 WARN akka.remote.RemoteWatcher >>>>>> - Detected unreachable: >>>>>> [akka.tcp://flink@<jm-host>:37 >>>>>> 840] >>>>>> 2018-01-19 12:53:34,676 INFO org.apache.flink.runtime.jobma >>>>>> nager.JobManager - Task manager >>>>>> akka.tcp://flink@<jm-host>:378 >>>>>> 40/user/taskmanager terminated. >>>>>> >>>>>> -- So once Flink session boots up, we are hitting it with pretty >>>>>> heavy load, which typically results in the WARN above >>>>>> >>>>>> *Task Manager:* >>>>>> 2018-01-19 12:38:01,002 INFO org.apache.flink.runtime.taskm >>>>>> anager.TaskManager - Starting TaskManager (Version: 1.4.0, >>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>> anager.TaskManager - Hadoop version: 2.6.5 >>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>> anager.TaskManager - JVM Options: >>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>> anager.TaskManager - -Xms16384M >>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>> anager.TaskManager - -Xmx16384M >>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>> anager.TaskManager - -XX:MaxDirectMemorySize=83886 07T >>>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>>> anager.TaskManager - -XX:+UseG1GC >>>>>> >>>>>> 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration >>>>>> .GlobalConfiguration - Loading configuration property: >>>>>> jobmanager.rpc.port, 6123 >>>>>> 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration >>>>>> .GlobalConfiguration - Loading configuration property: >>>>>> jobmanager.heap.mb, 16384 >>>>>> >>>>>> >>>>>> 2018-01-19 12:54:48,626 WARN akka.remote.RemoteWatcher >>>>>> - Detected unreachable: >>>>>> [akka.tcp://flink@<jm-host>:61 >>>>>> 23] >>>>>> 2018-01-19 12:54:48,690 INFO akka.remote.Remoting >>>>>> - Quarantined address >>>>>> [akka.tcp://flink@<jm-host>:61 >>>>>> 23] is still unreachable or has not been restarted. Keeping it >>>>>> quarantined. >>>>>> 018-01-19 12:54:48,774 WARN akka.remote.Remoting >>>>>> - Tried to associate with unreachable remote >>>>>> address [akka.tcp://flink@<tm-host>:61 >>>>>> 23]. Address is now gated for 5000 ms, all messages to this address >>>>>> will be delivered to dead letters. Reason: [The remote system has a UID >>>>>> that has been quarantined. Association aborted.] >>>>>> 2018-01-19 12:54:48,833 WARN akka.remote.Remoting >>>>>> - Tried to associate with unreachable remote >>>>>> address [akka.tcp://flink@<tm-host>:61 >>>>>> 23]. Address is now gated for 5000 ms, all messages to this address >>>>>> will be delivered to dead letters. Reason: [The remote system has >>>>>> quarantined this system. No further associations to the remote system are >>>>>> possible until this system is restarted.] >>>>>> <bunch of ERRORs on operations not shutdown properly - assuming >>>>>> because JM is unreachable> >>>>>> >>>>>> 2018-01-19 12:56:51,244 INFO org.apache.flink.runtime.taskm >>>>>> anager.TaskManager - Trying to register at JobManager >>>>>> akka.tcp://flink@<jm-host>:612 >>>>>> 3/user/jobmanager (attempt 10, timeout: 30000 milliseconds) >>>>>> 2018-01-19 12:56:51,253 WARN akka.remote.Remoting >>>>>> - Tried to associate with unreachable remote >>>>>> address [akka.tcp://flink@<jm-host>:61 >>>>>> 23]. Address is now gated for 5000 ms, all messages to this address >>>>>> will be delivered to dead letters. Reason: [The remote system has >>>>>> quarantined this system. No further associations to the remote system are >>>>>> possible until this system is restarted.] >>>>>> >>>>>> So bottom line is, JM and TM couldn't communicate under load, which >>>>>> is obviously not good. I tried to bump up akka.tcp.timeout as well but it >>>>>> didnt help either. So my question here is after all processing is halted >>>>>> and there is no new data being picked up, shouldn't this environment >>>>>> self-heal? Any other things I can be looking at other than extending >>>>>> timeouts? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Ashish >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >