Till, thanks for the follow-up. looking forward to 1.5 :) On Fri, May 25, 2018 at 2:11 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> Hi Steven, > > we don't have `jobmanager.exit-on-fatal-akka-error` because then the JM > would also be killed if a single TM gets quarantined. This is also not a > desired behaviour. > > With Flink 1.5 the problem with quarantining should be gone since we don't > rely anymore on Akka's death watch and instead use our own heartbeats. > > Cheers, > Till > > On Mon, May 14, 2018 at 1:07 AM, Steven Wu <stevenz...@gmail.com> wrote: > >> Till, >> >> thanks for the clarification. yes, that situation is undesirable either. >> >> In our case, restarting jobmanager could also recover the job from akk >> association lock-out. it was actually the issue (high GC pause) on >> jobmanager side that caused the akka failure. >> >> do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does it >> make sense to terminate jobmanager in this case? >> >> Thanks, >> Steven >> >> On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Steven, >>> >>> the reason why we did not turn on this feature per default was that in >>> case of a true JM failure, all of the TMs will think that they got >>> quarantined which triggers their shut down. Depending on how many container >>> restarts you have left on Yarn, for example, this can lead to a situation >>> where Flink is not able to recover the job even though it needed to only >>> restart the JM container. >>> >>> Cheers, >>> Till >>> >>> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com> >>> wrote: >>> >>>> Till, >>>> >>>> We ran into the same issue. It started with high GC pause that caused >>>> jobmanager to lose zk conn and leadership and caused jobmanager to >>>> quarantine taskmanager in akka. Once quarantined, akka association btw >>>> jobmanager and taskmanager is locked forever. >>>> >>>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true" >>>> worked. taskmanager exited and replacement taskmanager joined the cluster >>>> afterwards. I am wondering why is this not defaulted to "true". Any >>>> downside? >>>> >>>> Thanks, >>>> Steven >>>> >>>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com> >>>> wrote: >>>> >>>>> @Jelmer, this is Till's las response on the issue. >>>>> >>>>> -- Ashish >>>>> >>>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann >>>>> <trohrm...@apache.org> wrote: >>>>> Hi, >>>>> >>>>> this sounds like a serious regression wrt Flink 1.3.2 and we should >>>>> definitely find out what's causing this problem. Given from what I see in >>>>> the logs, the following happens: >>>>> >>>>> For some time the JobManager seems to no longer receive heartbeats >>>>> from the TaskManager. This could be, for example, due to long GC pauses or >>>>> heavy load which starves the ActorSystem's threads which are responsible >>>>> for sending the heartbeats. Due to this, the TM's ActorSystem is >>>>> quarantined which effectively renders them useless because the JM will >>>>> henceforth ignore all messages from these systems. The only way to resolve >>>>> this problem is to restart the ActorSystem. By >>>>> setting taskmanager.exit-on-fatal-akka-error to true in >>>>> flink-conf.yaml, a quarantined TM will shut down. If you run the Flink >>>>> cluster on Yarn, then a new substitute TM will be started if you have >>>>> still >>>>> some container restarts left. That way, the system should be able to >>>>> recover. >>>>> >>>>> Additionally you could try to play around >>>>> with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause >>>>> which control the heartbeat interval and the acceptable pause. By >>>>> increasing the latter, the system should tolerate longer GC pauses and >>>>> period of high load. >>>>> >>>>> However, this only addresses the symptoms of the problem and I'd like >>>>> to find out what's causing the problem. In order to further debug the >>>>> problem, it would be really helpful to obtain the logs of the JobManager >>>>> and the TaskManagers on DEBUG log level and with >>>>> taskmanager.debug.memory.startLogThread >>>>> set to true. Additionally it would be interesting to see whats happening >>>>> on >>>>> the TaskManagers when you observe high load. So obtaining a profiler dump >>>>> via VisualVM would be great. And last but not least, it also helps to >>>>> learn >>>>> more about the job you're running. What kind of connectors is it using? >>>>> Are >>>>> you using Flink's metric system? How is the Flink cluster deployed? Which >>>>> other libraries are you using in your job? >>>>> >>>>> Thanks a lot for your help! >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com> >>>>> wrote: >>>>> >>>>> I've seen a similar issue while running successive Flink SQL batches >>>>> on 1.4. In my case, the Job Manager would fail with the log output about >>>>> unreachability (with an additional statement about something going >>>>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where >>>>> everything works perfectly, but we will try again soon on 1.4. When we do >>>>> I >>>>> will post the actual log output. >>>>> >>>>> This was on YARN in AWS, with akka.ask.timeout = 60s. >>>>> >>>>> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashish...@yahoo.com> >>>>> wrote: >>>>> >>>>> I haven’t gotten much further with this. It doesn’t look like GC >>>>> related - at least GC counters were not that atrocious. However, my main >>>>> concern was once the load subsides why aren’t TM and JM connecting again? >>>>> That doesn’t look normal. I could definitely tell JM was listening on the >>>>> port and from logs it does appear TM is trying to message JM that is still >>>>> alive. >>>>> >>>>> Thanks, Ashish >>>>> >>>>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard < >>>>> lassenederga...@gmail.com> wrote: >>>>> >>>>> Hi. >>>>> >>>>> Did you find a reason for the detaching ? >>>>> I sometimes see the same on our system running Flink 1.4 on dc/os. I >>>>> have enabled taskmanager.Debug.memory.start logthread for debugging. >>>>> >>>>> Med venlig hilsen / Best regards >>>>> Lasse Nedergaard >>>>> >>>>> >>>>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientru...@gmail.com >>>>> >: >>>>> >>>>> Hi, >>>>> >>>>> You should enable and check your garbage collection log. >>>>> >>>>> We've encountered case where Task Manager disassociated due to long GC >>>>> pause. >>>>> >>>>> >>>>> Regards, >>>>> >>>>> Kien >>>>> On 1/20/2018 1:27 AM, ashish pok wrote: >>>>> >>>>> Hi All, >>>>> >>>>> We have hit some load related issues and was wondering if any one has >>>>> some suggestions. We are noticing task managers and job managers being >>>>> detached from each other under load and never really sync up again. As a >>>>> result, Flink session shows 0 slots available for processing. Even though, >>>>> apps are configured to restart it isn't really helping as there are no >>>>> slots available to run the apps. >>>>> >>>>> >>>>> Here are excerpt from logs that seemed relevant. (I am trimming out >>>>> rest of the logs for brevity) >>>>> >>>>> *Job Manager:* >>>>> 2018-01-19 12:38:00,423 INFO org.apache.flink.runtime.jobma >>>>> nager.JobManager - Starting JobManager (Version: 1.4.0, >>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>>>> >>>>> 2018-01-19 12:38:00,792 INFO org.apache.flink.runtime.jobma >>>>> nager.JobManager - Maximum heap size: 16384 MiBytes >>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>> nager.JobManager - Hadoop version: 2.6.5 >>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>> nager.JobManager - JVM Options: >>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>> nager.JobManager - -Xms16384m >>>>> 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobma >>>>> nager.JobManager - -Xmx16384m >>>>> 2018-01-19 12:38:00,795 INFO org.apache.flink.runtime.jobma >>>>> nager.JobManager - -XX:+UseG1GC >>>>> >>>>> 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration >>>>> .GlobalConfiguration - Loading configuration property: >>>>> jobmanager.rpc.port, 6123 >>>>> 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration >>>>> .GlobalConfiguration - Loading configuration property: >>>>> jobmanager.heap.mb, 16384 >>>>> >>>>> >>>>> 2018-01-19 12:53:34,671 WARN akka.remote.RemoteWatcher >>>>> - Detected unreachable: >>>>> [akka.tcp://flink@<jm-host>:37 >>>>> 840] >>>>> 2018-01-19 12:53:34,676 INFO org.apache.flink.runtime.jobma >>>>> nager.JobManager - Task manager >>>>> akka.tcp://flink@<jm-host>:378 >>>>> 40/user/taskmanager terminated. >>>>> >>>>> -- So once Flink session boots up, we are hitting it with pretty heavy >>>>> load, which typically results in the WARN above >>>>> >>>>> *Task Manager:* >>>>> 2018-01-19 12:38:01,002 INFO org.apache.flink.runtime.taskm >>>>> anager.TaskManager - Starting TaskManager (Version: 1.4.0, >>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>> anager.TaskManager - Hadoop version: 2.6.5 >>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>> anager.TaskManager - JVM Options: >>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>> anager.TaskManager - -Xms16384M >>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>> anager.TaskManager - -Xmx16384M >>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>> anager.TaskManager - -XX:MaxDirectMemorySize=83886 07T >>>>> 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskm >>>>> anager.TaskManager - -XX:+UseG1GC >>>>> >>>>> 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration >>>>> .GlobalConfiguration - Loading configuration property: >>>>> jobmanager.rpc.port, 6123 >>>>> 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration >>>>> .GlobalConfiguration - Loading configuration property: >>>>> jobmanager.heap.mb, 16384 >>>>> >>>>> >>>>> 2018-01-19 12:54:48,626 WARN akka.remote.RemoteWatcher >>>>> - Detected unreachable: >>>>> [akka.tcp://flink@<jm-host>:61 >>>>> 23] >>>>> 2018-01-19 12:54:48,690 INFO akka.remote.Remoting >>>>> - Quarantined address [akka.tcp://flink@<jm-host>:61 >>>>> 23] is still unreachable or has not been restarted. Keeping it >>>>> quarantined. >>>>> 018-01-19 12:54:48,774 WARN akka.remote.Remoting >>>>> - Tried to associate with unreachable remote address >>>>> [akka.tcp://flink@<tm-host>:61 >>>>> 23]. Address is now gated for 5000 ms, all messages to this address >>>>> will be delivered to dead letters. Reason: [The remote system has a UID >>>>> that has been quarantined. Association aborted.] >>>>> 2018-01-19 12:54:48,833 WARN akka.remote.Remoting >>>>> - Tried to associate with unreachable remote >>>>> address [akka.tcp://flink@<tm-host>:61 >>>>> 23]. Address is now gated for 5000 ms, all messages to this address >>>>> will be delivered to dead letters. Reason: [The remote system has >>>>> quarantined this system. No further associations to the remote system are >>>>> possible until this system is restarted.] >>>>> <bunch of ERRORs on operations not shutdown properly - assuming >>>>> because JM is unreachable> >>>>> >>>>> 2018-01-19 12:56:51,244 INFO org.apache.flink.runtime.taskm >>>>> anager.TaskManager - Trying to register at JobManager >>>>> akka.tcp://flink@<jm-host>:612 >>>>> 3/user/jobmanager (attempt 10, timeout: 30000 milliseconds) >>>>> 2018-01-19 12:56:51,253 WARN akka.remote.Remoting >>>>> - Tried to associate with unreachable remote >>>>> address [akka.tcp://flink@<jm-host>:61 >>>>> 23]. Address is now gated for 5000 ms, all messages to this address >>>>> will be delivered to dead letters. Reason: [The remote system has >>>>> quarantined this system. No further associations to the remote system are >>>>> possible until this system is restarted.] >>>>> >>>>> So bottom line is, JM and TM couldn't communicate under load, which is >>>>> obviously not good. I tried to bump up akka.tcp.timeout as well but it >>>>> didnt help either. So my question here is after all processing is halted >>>>> and there is no new data being picked up, shouldn't this environment >>>>> self-heal? Any other things I can be looking at other than extending >>>>> timeouts? >>>>> >>>>> Thanks, >>>>> >>>>> Ashish >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >