Hi Till, Thanks for detailed response. I will try to gather some of this information during the week and follow up.
— Ashish > On Feb 5, 2018, at 5:55 AM, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi, > > this sounds like a serious regression wrt Flink 1.3.2 and we should > definitely find out what's causing this problem. Given from what I see in the > logs, the following happens: > > For some time the JobManager seems to no longer receive heartbeats from the > TaskManager. This could be, for example, due to long GC pauses or heavy load > which starves the ActorSystem's threads which are responsible for sending the > heartbeats. Due to this, the TM's ActorSystem is quarantined which > effectively renders them useless because the JM will henceforth ignore all > messages from these systems. The only way to resolve this problem is to > restart the ActorSystem. By setting taskmanager.exit-on-fatal-akka-error to > true in flink-conf.yaml, a quarantined TM will shut down. If you run the > Flink cluster on Yarn, then a new substitute TM will be started if you have > still some container restarts left. That way, the system should be able to > recover. > > Additionally you could try to play around with akka.watch.heartbeat.interval > and akka.watch.heartbeat.pause which control the heartbeat interval and the > acceptable pause. By increasing the latter, the system should tolerate longer > GC pauses and period of high load. > > However, this only addresses the symptoms of the problem and I'd like to find > out what's causing the problem. In order to further debug the problem, it > would be really helpful to obtain the logs of the JobManager and the > TaskManagers on DEBUG log level and with > taskmanager.debug.memory.startLogThread set to true. Additionally it would be > interesting to see whats happening on the TaskManagers when you observe high > load. So obtaining a profiler dump via VisualVM would be great. And last but > not least, it also helps to learn more about the job you're running. What > kind of connectors is it using? Are you using Flink's metric system? How is > the Flink cluster deployed? Which other libraries are you using in your job? > > Thanks a lot for your help! > > Cheers, > Till > > On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com > <mailto:cre...@gmail.com>> wrote: > I've seen a similar issue while running successive Flink SQL batches on 1.4. > In my case, the Job Manager would fail with the log output about > unreachability (with an additional statement about something going "horribly > wrong"). Under workload pressure, I reverted to 1.3.2 where everything works > perfectly, but we will try again soon on 1.4. When we do I will post the > actual log output. > > This was on YARN in AWS, with akka.ask.timeout = 60s. > > On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashish...@yahoo.com > <mailto:ashish...@yahoo.com>> wrote: > I haven’t gotten much further with this. It doesn’t look like GC related - at > least GC counters were not that atrocious. However, my main concern was once > the load subsides why aren’t TM and JM connecting again? That doesn’t look > normal. I could definitely tell JM was listening on the port and from logs it > does appear TM is trying to message JM that is still alive. > > Thanks, Ashish > >> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <lassenederga...@gmail.com >> <mailto:lassenederga...@gmail.com>> wrote: >> >> Hi. >> >> Did you find a reason for the detaching ? >> I sometimes see the same on our system running Flink 1.4 on dc/os. I have >> enabled taskmanager.Debug.memory.startlogthread for debugging. >> >> Med venlig hilsen / Best regards >> Lasse Nedergaard >> >> >> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientru...@gmail.com >> <mailto:duckientru...@gmail.com>>: >> >>> Hi, >>> >>> You should enable and check your garbage collection log. >>> >>> We've encountered case where Task Manager disassociated due to long GC >>> pause. >>> >>> Regards, >>> >>> Kien >>> On 1/20/2018 1:27 AM, ashish pok wrote: >>>> Hi All, >>>> >>>> We have hit some load related issues and was wondering if any one has some >>>> suggestions. We are noticing task managers and job managers being detached >>>> from each other under load and never really sync up again. As a result, >>>> Flink session shows 0 slots available for processing. Even though, apps >>>> are configured to restart it isn't really helping as there are no slots >>>> available to run the apps. >>>> >>>> >>>> Here are excerpt from logs that seemed relevant. (I am trimming out rest >>>> of the logs for brevity) >>>> >>>> Job Manager: >>>> 2018-01-19 12:38:00,423 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - Starting >>>> JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>>> >>>> 2018-01-19 12:38:00,792 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - Maximum >>>> heap size: 16384 MiBytes >>>> 2018-01-19 12:38:00,794 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - Hadoop >>>> version: 2.6.5 >>>> 2018-01-19 12:38:00,794 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - JVM >>>> Options: >>>> 2018-01-19 12:38:00,794 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - >>>> -Xms16384m >>>> 2018-01-19 12:38:00,794 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - >>>> -Xmx16384m >>>> 2018-01-19 12:38:00,795 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - >>>> -XX:+UseG1GC >>>> >>>> 2018-01-19 12:38:00,908 INFO >>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>> configuration property: jobmanager.rpc.port, 6123 >>>> 2018-01-19 12:38:00,908 INFO >>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>> configuration property: jobmanager.heap.mb, 16384 >>>> >>>> >>>> 2018-01-19 12:53:34,671 WARN akka.remote.RemoteWatcher >>>> - Detected unreachable: [akka.tcp://flink@<jm-host>:37840 >>>> <>] >>>> 2018-01-19 12:53:34,676 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - Task >>>> manager akka.tcp://flink@<jm-host>:37840/user/taskmanager <> terminated. >>>> >>>> -- So once Flink session boots up, we are hitting it with pretty heavy >>>> load, which typically results in the WARN above >>>> >>>> Task Manager: >>>> 2018-01-19 12:38:01,002 INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - Starting >>>> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>>> 2018-01-19 12:38:01,367 INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - Hadoop >>>> version: 2.6.5 >>>> 2018-01-19 12:38:01,367 INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - JVM >>>> Options: >>>> 2018-01-19 12:38:01,367 INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - >>>> -Xms16384M >>>> 2018-01-19 12:38:01,367 INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - >>>> -Xmx16384M >>>> 2018-01-19 12:38:01,367 INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - >>>> -XX:MaxDirectMemorySize=8388607T >>>> 2018-01-19 12:38:01,367 INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - >>>> -XX:+UseG1GC >>>> >>>> 2018-01-19 12:38:01,392 INFO >>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>> configuration property: jobmanager.rpc.port, 6123 >>>> 2018-01-19 12:38:01,392 INFO >>>> org.apache.flink.configuration.GlobalConfiguration - Loading >>>> configuration property: jobmanager.heap.mb, 16384 >>>> >>>> >>>> 2018-01-19 12:54:48,626 WARN akka.remote.RemoteWatcher >>>> - Detected unreachable: [akka.tcp://flink@<jm-host>:6123 >>>> <>] >>>> 2018-01-19 12:54:48,690 INFO akka.remote.Remoting >>>> - Quarantined address [akka.tcp://flink@<jm-host>:6123 >>>> <>] is still unreachable or has not been restarted. Keeping it quarantined. >>>> 018-01-19 12:54:48,774 WARN akka.remote.Remoting >>>> - Tried to associate with unreachable remote address >>>> [akka.tcp://flink@<tm-host>:6123 <>]. Address is now gated >>>> for 5000 ms, all messages to this address will be delivered to dead >>>> letters. Reason: [The remote system has a UID that has been quarantined. >>>> Association aborted.] >>>> 2018-01-19 12:54:48,833 WARN akka.remote.Remoting >>>> - Tried to associate with unreachable remote address >>>> [akka.tcp://flink@<tm-host>:6123 <>]. Address is now gated for 5000 ms, >>>> all messages to this address will be delivered to dead letters. Reason: >>>> [The remote system has quarantined this system. No further associations to >>>> the remote system are possible until this system is restarted.] >>>> <bunch of ERRORs on operations not shutdown properly - assuming because JM >>>> is unreachable> >>>> >>>> 2018-01-19 12:56:51,244 INFO >>>> org.apache.flink.runtime.taskmanager.TaskManager - Trying to >>>> register at JobManager akka.tcp://flink@<jm-host>:6123/user/jobmanager >>>> <>(attempt 10, timeout: 30000 milliseconds) >>>> 2018-01-19 12:56:51,253 WARN akka.remote.Remoting >>>> - Tried to associate with unreachable remote address >>>> [akka.tcp://flink@<jm-host>:6123 <>]. Address is now gated for 5000 ms, >>>> all messages to this address will be delivered to dead letters. Reason: >>>> [The remote system has quarantined this system. No further associations to >>>> the remote system are possible until this system is restarted.] >>>> >>>> So bottom line is, JM and TM couldn't communicate under load, which is >>>> obviously not good. I tried to bump up akka.tcp.timeout as well but it >>>> didnt help either. So my question here is after all processing is halted >>>> and there is no new data being picked up, shouldn't this environment >>>> self-heal? Any other things I can be looking at other than extending >>>> timeouts? >>>> >>>> Thanks, >>>> >>>> Ashish >>>> >>>> >>>> >>>> >>>> >>>> > > >