Hi Till,

Thanks for detailed response. I will try to gather some of this information 
during the week and follow up.

— Ashish

> On Feb 5, 2018, at 5:55 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi,
> 
> this sounds like a serious regression wrt Flink 1.3.2 and we should 
> definitely find out what's causing this problem. Given from what I see in the 
> logs, the following happens:
> 
> For some time the JobManager seems to no longer receive heartbeats from the 
> TaskManager. This could be, for example, due to long GC pauses or heavy load 
> which starves the ActorSystem's threads which are responsible for sending the 
> heartbeats. Due to this, the TM's ActorSystem is quarantined which 
> effectively renders them useless because the JM will henceforth ignore all 
> messages from these systems. The only way to resolve this problem is to 
> restart the ActorSystem. By setting taskmanager.exit-on-fatal-akka-error to 
> true in flink-conf.yaml, a quarantined TM will shut down. If you run the 
> Flink cluster on Yarn, then a new substitute TM will be started if you have 
> still some container restarts left. That way, the system should be able to 
> recover.
> 
> Additionally you could try to play around with akka.watch.heartbeat.interval 
> and akka.watch.heartbeat.pause which control the heartbeat interval and the 
> acceptable pause. By increasing the latter, the system should tolerate longer 
> GC pauses and period of high load.
> 
> However, this only addresses the symptoms of the problem and I'd like to find 
> out what's causing the problem. In order to further debug the problem, it 
> would be really helpful to obtain the logs of the JobManager and the 
> TaskManagers on DEBUG log level and with 
> taskmanager.debug.memory.startLogThread set to true. Additionally it would be 
> interesting to see whats happening on the TaskManagers when you observe high 
> load. So obtaining a profiler dump via VisualVM would be great. And last but 
> not least, it also helps to learn more about the job you're running. What 
> kind of connectors is it using? Are you using Flink's metric system? How is 
> the Flink cluster deployed? Which other libraries are you using in your job?
> 
> Thanks a lot for your help!
> 
> Cheers,
> Till
> 
> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com 
> <mailto:cre...@gmail.com>> wrote:
> I've seen a similar issue while running successive Flink SQL batches on 1.4. 
> In my case, the Job Manager would fail with the log output about 
> unreachability (with an additional statement about something going "horribly 
> wrong"). Under workload pressure, I reverted to 1.3.2 where everything works 
> perfectly, but we will try again soon on 1.4. When we do I will post the 
> actual log output.
> 
> This was on YARN in AWS, with akka.ask.timeout = 60s.
> 
> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashish...@yahoo.com 
> <mailto:ashish...@yahoo.com>> wrote:
> I haven’t gotten much further with this. It doesn’t look like GC related - at 
> least GC counters were not that atrocious. However, my main concern was once 
> the load subsides why aren’t TM and JM connecting again? That doesn’t look 
> normal. I could definitely tell JM was listening on the port and from logs it 
> does appear TM is trying to message JM that is still alive. 
> 
> Thanks, Ashish
> 
>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <lassenederga...@gmail.com 
>> <mailto:lassenederga...@gmail.com>> wrote:
>> 
>> Hi. 
>> 
>> Did you find a reason for the detaching ?
>> I sometimes see the same on our system running Flink 1.4 on dc/os. I have 
>> enabled taskmanager.Debug.memory.startlogthread for debugging. 
>> 
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>> 
>> 
>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <duckientru...@gmail.com 
>> <mailto:duckientru...@gmail.com>>:
>> 
>>> Hi,
>>> 
>>> You should enable and check your garbage collection log.
>>> 
>>> We've encountered case where Task Manager disassociated due to long GC 
>>> pause.
>>> 
>>> Regards,
>>> 
>>> Kien
>>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>>> Hi All,
>>>> 
>>>> We have hit some load related issues and was wondering if any one has some 
>>>> suggestions. We are noticing task managers and job managers being detached 
>>>> from each other under load and never really sync up again. As a result, 
>>>> Flink session shows 0 slots available for processing. Even though, apps 
>>>> are configured to restart it isn't really helping as there are no slots 
>>>> available to run the apps.
>>>> 
>>>> 
>>>> Here are excerpt from logs that seemed relevant. (I am trimming out rest 
>>>> of the logs for brevity)
>>>> 
>>>> Job Manager:
>>>> 2018-01-19 12:38:00,423 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                -  Starting 
>>>> JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>> 
>>>> 2018-01-19 12:38:00,792 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                -  Maximum 
>>>> heap size: 16384 MiBytes
>>>> 2018-01-19 12:38:00,794 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                -  Hadoop 
>>>> version: 2.6.5
>>>> 2018-01-19 12:38:00,794 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                -  JVM 
>>>> Options:
>>>> 2018-01-19 12:38:00,794 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                -     
>>>> -Xms16384m
>>>> 2018-01-19 12:38:00,794 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                -     
>>>> -Xmx16384m
>>>> 2018-01-19 12:38:00,795 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                -     
>>>> -XX:+UseG1GC
>>>> 
>>>> 2018-01-19 12:38:00,908 INFO  
>>>> org.apache.flink.configuration.GlobalConfiguration            - Loading 
>>>> configuration property: jobmanager.rpc.port, 6123
>>>> 2018-01-19 12:38:00,908 INFO  
>>>> org.apache.flink.configuration.GlobalConfiguration            - Loading 
>>>> configuration property: jobmanager.heap.mb, 16384
>>>> 
>>>> 
>>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher                    
>>>>                  - Detected unreachable: [akka.tcp://flink@<jm-host>:37840 
>>>> <>]
>>>> 2018-01-19 12:53:34,676 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                - Task 
>>>> manager akka.tcp://flink@<jm-host>:37840/user/taskmanager <> terminated.
>>>> 
>>>> -- So once Flink session boots up, we are hitting it with pretty heavy 
>>>> load, which typically results in the WARN above
>>>> 
>>>> Task Manager:
>>>> 2018-01-19 12:38:01,002 INFO  
>>>> org.apache.flink.runtime.taskmanager.TaskManager              -  Starting 
>>>> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>> 2018-01-19 12:38:01,367 INFO  
>>>> org.apache.flink.runtime.taskmanager.TaskManager              -  Hadoop 
>>>> version: 2.6.5
>>>> 2018-01-19 12:38:01,367 INFO  
>>>> org.apache.flink.runtime.taskmanager.TaskManager              -  JVM 
>>>> Options:
>>>> 2018-01-19 12:38:01,367 INFO  
>>>> org.apache.flink.runtime.taskmanager.TaskManager              -     
>>>> -Xms16384M
>>>> 2018-01-19 12:38:01,367 INFO  
>>>> org.apache.flink.runtime.taskmanager.TaskManager              -     
>>>> -Xmx16384M
>>>> 2018-01-19 12:38:01,367 INFO  
>>>> org.apache.flink.runtime.taskmanager.TaskManager              -     
>>>> -XX:MaxDirectMemorySize=8388607T
>>>> 2018-01-19 12:38:01,367 INFO  
>>>> org.apache.flink.runtime.taskmanager.TaskManager              -     
>>>> -XX:+UseG1GC
>>>> 
>>>> 2018-01-19 12:38:01,392 INFO  
>>>> org.apache.flink.configuration.GlobalConfiguration            - Loading 
>>>> configuration property: jobmanager.rpc.port, 6123
>>>> 2018-01-19 12:38:01,392 INFO  
>>>> org.apache.flink.configuration.GlobalConfiguration            - Loading 
>>>> configuration property: jobmanager.heap.mb, 16384
>>>> 
>>>> 
>>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher                    
>>>>                  - Detected unreachable: [akka.tcp://flink@<jm-host>:6123 
>>>> <>]
>>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting                         
>>>>                  - Quarantined address [akka.tcp://flink@<jm-host>:6123 
>>>> <>] is still unreachable or has not been restarted. Keeping it quarantined.
>>>> 018-01-19 12:54:48,774 WARN  akka.remote.Remoting                          
>>>>                 - Tried to associate with unreachable remote address 
>>>> [akka.tcp://flink@<tm-host>:6123 <>]. Address is now               gated 
>>>> for 5000 ms, all messages to this address will be delivered to dead 
>>>> letters. Reason: [The remote system has a UID that has been quarantined. 
>>>> Association aborted.] 
>>>> 2018-01-19 12:54:48,833 WARN  akka.remote.Remoting                         
>>>>                  - Tried to associate with unreachable remote address 
>>>> [akka.tcp://flink@<tm-host>:6123 <>]. Address is now gated for 5000 ms, 
>>>> all messages to this address will be delivered to dead letters. Reason: 
>>>> [The remote system has quarantined this system. No further associations to 
>>>> the remote system are possible until this system is restarted.] 
>>>> <bunch of ERRORs on operations not shutdown properly - assuming because JM 
>>>> is unreachable>
>>>> 
>>>> 2018-01-19 12:56:51,244 INFO  
>>>> org.apache.flink.runtime.taskmanager.TaskManager              - Trying to 
>>>> register at JobManager akka.tcp://flink@<jm-host>:6123/user/jobmanager 
>>>> <>(attempt 10, timeout: 30000 milliseconds)
>>>> 2018-01-19 12:56:51,253 WARN  akka.remote.Remoting                         
>>>>                  - Tried to associate with unreachable remote address 
>>>> [akka.tcp://flink@<jm-host>:6123 <>]. Address is now gated for 5000 ms, 
>>>> all messages to this address will be delivered to dead letters. Reason: 
>>>> [The remote system has quarantined this system. No further associations to 
>>>> the remote system are possible until this system is restarted.] 
>>>> 
>>>> So bottom line is, JM and TM couldn't communicate under load, which is 
>>>> obviously not good. I tried to bump up akka.tcp.timeout as well but it 
>>>> didnt help either. So my question here is after all processing is halted 
>>>> and there is no new data being picked up, shouldn't this environment 
>>>> self-heal? Any other things I can be looking at other than extending 
>>>> timeouts?
>>>> 
>>>> Thanks,
>>>> 
>>>> Ashish
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
> 
> 
> 

Reply via email to