I haven’t gotten much further with this. It doesn’t look like GC related - at 
least GC counters were not that atrocious. However, my main concern was once 
the load subsides why aren’t TM and JM connecting again? That doesn’t look 
normal. I could definitely tell JM was listening on the port and from logs it 
does appear TM is trying to message JM that is still alive. 

Thanks, Ashish

> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <lassenederga...@gmail.com> 
> wrote:
> 
> Hi. 
> 
> Did you find a reason for the detaching ?
> I sometimes see the same on our system running Flink 1.4 on dc/os. I have 
> enabled taskmanager.Debug.memory.startlogthread for debugging. 
> 
> Med venlig hilsen / Best regards
> Lasse Nedergaard
> 
> 
> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientru...@gmail.com 
> <mailto:duckientru...@gmail.com>>:
> 
>> Hi,
>> 
>> You should enable and check your garbage collection log.
>> 
>> We've encountered case where Task Manager disassociated due to long GC pause.
>> 
>> Regards,
>> 
>> Kien
>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>> Hi All,
>>> 
>>> We have hit some load related issues and was wondering if any one has some 
>>> suggestions. We are noticing task managers and job managers being detached 
>>> from each other under load and never really sync up again. As a result, 
>>> Flink session shows 0 slots available for processing. Even though, apps are 
>>> configured to restart it isn't really helping as there are no slots 
>>> available to run the apps.
>>> 
>>> 
>>> Here are excerpt from logs that seemed relevant. (I am trimming out rest of 
>>> the logs for brevity)
>>> 
>>> Job Manager:
>>> 2018-01-19 12:38:00,423 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                -  Starting 
>>> JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>> 
>>> 2018-01-19 12:38:00,792 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                -  Maximum 
>>> heap size: 16384 MiBytes
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                -  Hadoop 
>>> version: 2.6.5
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                -  JVM 
>>> Options:
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                -     
>>> -Xms16384m
>>> 2018-01-19 12:38:00,794 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                -     
>>> -Xmx16384m
>>> 2018-01-19 12:38:00,795 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                -     
>>> -XX:+UseG1GC
>>> 
>>> 2018-01-19 12:38:00,908 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration            - Loading 
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2018-01-19 12:38:00,908 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration            - Loading 
>>> configuration property: jobmanager.heap.mb, 16384
>>> 
>>> 
>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher                     
>>>                 - Detected unreachable: [akka.tcp://flink@<jm-host>:37840]
>>> 2018-01-19 12:53:34,676 INFO  
>>> org.apache.flink.runtime.jobmanager.JobManager                - Task 
>>> manager akka.tcp://flink@<jm-host>:37840/user/taskmanager terminated.
>>> 
>>> -- So once Flink session boots up, we are hitting it with pretty heavy 
>>> load, which typically results in the WARN above
>>> 
>>> Task Manager:
>>> 2018-01-19 12:38:01,002 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager              -  Starting 
>>> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager              -  Hadoop 
>>> version: 2.6.5
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager              -  JVM 
>>> Options:
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager              -     
>>> -Xms16384M
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager              -     
>>> -Xmx16384M
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager              -     
>>> -XX:MaxDirectMemorySize=8388607T
>>> 2018-01-19 12:38:01,367 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager              -     
>>> -XX:+UseG1GC
>>> 
>>> 2018-01-19 12:38:01,392 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration            - Loading 
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2018-01-19 12:38:01,392 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration            - Loading 
>>> configuration property: jobmanager.heap.mb, 16384
>>> 
>>> 
>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher                     
>>>                 - Detected unreachable: [akka.tcp://flink@<jm-host>:6123]
>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting                          
>>>                 - Quarantined address [akka.tcp://flink@<jm-host>:6123] is 
>>> still unreachable or has not been restarted. Keeping it quarantined.
>>> 018-01-19 12:54:48,774 WARN  akka.remote.Remoting                           
>>>                - Tried to associate with unreachable remote address 
>>> [akka.tcp://flink@<tm-host>:6123]. Address is now gated for 5000 ms, all 
>>> messages to this address will be delivered to dead letters. Reason: [The 
>>> remote system has a UID that has been quarantined. Association aborted.] 
>>> 2018-01-19 12:54:48,833 WARN  akka.remote.Remoting                          
>>>                 - Tried to associate with unreachable remote address 
>>> [akka.tcp://flink@<tm-host>:6123]. Address is now gated for 5000 ms, all 
>>> messages to this address will be delivered to dead letters. Reason: [The 
>>> remote system has quarantined this system. No further associations to the 
>>> remote system are possible until this system is restarted.] 
>>> <bunch of ERRORs on operations not shutdown properly - assuming because JM 
>>> is unreachable>
>>> 
>>> 2018-01-19 12:56:51,244 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager              - Trying to 
>>> register at JobManager akka.tcp://flink@<jm-host>:6123/user/jobmanager 
>>> (attempt 10, timeout: 30000 milliseconds)
>>> 2018-01-19 12:56:51,253 WARN  akka.remote.Remoting                          
>>>                 - Tried to associate with unreachable remote address 
>>> [akka.tcp://flink@<jm-host>:6123]. Address is now gated for 5000 ms, all 
>>> messages to this address will be delivered to dead letters. Reason: [The 
>>> remote system has quarantined this system. No further associations to the 
>>> remote system are possible until this system is restarted.] 
>>> 
>>> So bottom line is, JM and TM couldn't communicate under load, which is 
>>> obviously not good. I tried to bump up akka.tcp.timeout as well but it 
>>> didnt help either. So my question here is after all processing is halted 
>>> and there is no new data being picked up, shouldn't this environment 
>>> self-heal? Any other things I can be looking at other than extending 
>>> timeouts?
>>> 
>>> Thanks,
>>> 
>>> Ashish
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 

Reply via email to