I haven’t gotten much further with this. It doesn’t look like GC related - at least GC counters were not that atrocious. However, my main concern was once the load subsides why aren’t TM and JM connecting again? That doesn’t look normal. I could definitely tell JM was listening on the port and from logs it does appear TM is trying to message JM that is still alive.
Thanks, Ashish > On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <lassenederga...@gmail.com> > wrote: > > Hi. > > Did you find a reason for the detaching ? > I sometimes see the same on our system running Flink 1.4 on dc/os. I have > enabled taskmanager.Debug.memory.startlogthread for debugging. > > Med venlig hilsen / Best regards > Lasse Nedergaard > > > Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientru...@gmail.com > <mailto:duckientru...@gmail.com>>: > >> Hi, >> >> You should enable and check your garbage collection log. >> >> We've encountered case where Task Manager disassociated due to long GC pause. >> >> Regards, >> >> Kien >> On 1/20/2018 1:27 AM, ashish pok wrote: >>> Hi All, >>> >>> We have hit some load related issues and was wondering if any one has some >>> suggestions. We are noticing task managers and job managers being detached >>> from each other under load and never really sync up again. As a result, >>> Flink session shows 0 slots available for processing. Even though, apps are >>> configured to restart it isn't really helping as there are no slots >>> available to run the apps. >>> >>> >>> Here are excerpt from logs that seemed relevant. (I am trimming out rest of >>> the logs for brevity) >>> >>> Job Manager: >>> 2018-01-19 12:38:00,423 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - Starting >>> JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>> >>> 2018-01-19 12:38:00,792 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - Maximum >>> heap size: 16384 MiBytes >>> 2018-01-19 12:38:00,794 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - Hadoop >>> version: 2.6.5 >>> 2018-01-19 12:38:00,794 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - JVM >>> Options: >>> 2018-01-19 12:38:00,794 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - >>> -Xms16384m >>> 2018-01-19 12:38:00,794 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - >>> -Xmx16384m >>> 2018-01-19 12:38:00,795 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - >>> -XX:+UseG1GC >>> >>> 2018-01-19 12:38:00,908 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: jobmanager.rpc.port, 6123 >>> 2018-01-19 12:38:00,908 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: jobmanager.heap.mb, 16384 >>> >>> >>> 2018-01-19 12:53:34,671 WARN akka.remote.RemoteWatcher >>> - Detected unreachable: [akka.tcp://flink@<jm-host>:37840] >>> 2018-01-19 12:53:34,676 INFO >>> org.apache.flink.runtime.jobmanager.JobManager - Task >>> manager akka.tcp://flink@<jm-host>:37840/user/taskmanager terminated. >>> >>> -- So once Flink session boots up, we are hitting it with pretty heavy >>> load, which typically results in the WARN above >>> >>> Task Manager: >>> 2018-01-19 12:38:01,002 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Starting >>> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) >>> 2018-01-19 12:38:01,367 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Hadoop >>> version: 2.6.5 >>> 2018-01-19 12:38:01,367 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - JVM >>> Options: >>> 2018-01-19 12:38:01,367 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - >>> -Xms16384M >>> 2018-01-19 12:38:01,367 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - >>> -Xmx16384M >>> 2018-01-19 12:38:01,367 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - >>> -XX:MaxDirectMemorySize=8388607T >>> 2018-01-19 12:38:01,367 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - >>> -XX:+UseG1GC >>> >>> 2018-01-19 12:38:01,392 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: jobmanager.rpc.port, 6123 >>> 2018-01-19 12:38:01,392 INFO >>> org.apache.flink.configuration.GlobalConfiguration - Loading >>> configuration property: jobmanager.heap.mb, 16384 >>> >>> >>> 2018-01-19 12:54:48,626 WARN akka.remote.RemoteWatcher >>> - Detected unreachable: [akka.tcp://flink@<jm-host>:6123] >>> 2018-01-19 12:54:48,690 INFO akka.remote.Remoting >>> - Quarantined address [akka.tcp://flink@<jm-host>:6123] is >>> still unreachable or has not been restarted. Keeping it quarantined. >>> 018-01-19 12:54:48,774 WARN akka.remote.Remoting >>> - Tried to associate with unreachable remote address >>> [akka.tcp://flink@<tm-host>:6123]. Address is now gated for 5000 ms, all >>> messages to this address will be delivered to dead letters. Reason: [The >>> remote system has a UID that has been quarantined. Association aborted.] >>> 2018-01-19 12:54:48,833 WARN akka.remote.Remoting >>> - Tried to associate with unreachable remote address >>> [akka.tcp://flink@<tm-host>:6123]. Address is now gated for 5000 ms, all >>> messages to this address will be delivered to dead letters. Reason: [The >>> remote system has quarantined this system. No further associations to the >>> remote system are possible until this system is restarted.] >>> <bunch of ERRORs on operations not shutdown properly - assuming because JM >>> is unreachable> >>> >>> 2018-01-19 12:56:51,244 INFO >>> org.apache.flink.runtime.taskmanager.TaskManager - Trying to >>> register at JobManager akka.tcp://flink@<jm-host>:6123/user/jobmanager >>> (attempt 10, timeout: 30000 milliseconds) >>> 2018-01-19 12:56:51,253 WARN akka.remote.Remoting >>> - Tried to associate with unreachable remote address >>> [akka.tcp://flink@<jm-host>:6123]. Address is now gated for 5000 ms, all >>> messages to this address will be delivered to dead letters. Reason: [The >>> remote system has quarantined this system. No further associations to the >>> remote system are possible until this system is restarted.] >>> >>> So bottom line is, JM and TM couldn't communicate under load, which is >>> obviously not good. I tried to bump up akka.tcp.timeout as well but it >>> didnt help either. So my question here is after all processing is halted >>> and there is no new data being picked up, shouldn't this environment >>> self-heal? Any other things I can be looking at other than extending >>> timeouts? >>> >>> Thanks, >>> >>> Ashish >>> >>> >>> >>> >>> >>>