A clarification.. In 1.5 with custom heartbeats are there additional
configurations we should be concerned about ?

On Fri, May 25, 2018 at 10:17 AM, Steven Wu <stevenz...@gmail.com> wrote:

> Till, thanks for the follow-up. looking forward to 1.5 :)
>
> On Fri, May 25, 2018 at 2:11 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Steven,
>>
>> we don't have `jobmanager.exit-on-fatal-akka-error` because then the JM
>> would also be killed if a single TM gets quarantined. This is also not a
>> desired behaviour.
>>
>> With Flink 1.5 the problem with quarantining should be gone since we
>> don't rely anymore on Akka's death watch and instead use our own heartbeats.
>>
>> Cheers,
>> Till
>>
>> On Mon, May 14, 2018 at 1:07 AM, Steven Wu <stevenz...@gmail.com> wrote:
>>
>>> Till,
>>>
>>> thanks for the clarification. yes, that situation is undesirable either.
>>>
>>> In our case, restarting jobmanager could also recover the job from akk
>>> association lock-out. it was actually the issue (high GC pause) on
>>> jobmanager side that caused the akka failure.
>>>
>>> do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does
>>> it make sense to terminate jobmanager in this case?
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Steven,
>>>>
>>>> the reason why we did not turn on this feature per default was that in
>>>> case of a true JM failure, all of the TMs will think that they got
>>>> quarantined which triggers their shut down. Depending on how many container
>>>> restarts you have left on Yarn, for example, this can lead to a situation
>>>> where Flink is not able to recover the job even though it needed to only
>>>> restart the JM container.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com>
>>>> wrote:
>>>>
>>>>> Till,
>>>>>
>>>>> We ran into the same issue. It started with high GC pause that caused
>>>>> jobmanager to lose zk conn and leadership and caused jobmanager to
>>>>> quarantine taskmanager in akka. Once quarantined, akka association btw
>>>>> jobmanager and taskmanager is locked forever.
>>>>>
>>>>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true"
>>>>> worked. taskmanager exited and replacement taskmanager joined the cluster
>>>>> afterwards. I am wondering why is this not defaulted to "true". Any
>>>>> downside?
>>>>>
>>>>> Thanks,
>>>>> Steven
>>>>>
>>>>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com>
>>>>> wrote:
>>>>>
>>>>>> @Jelmer, this is Till's las response on the issue.
>>>>>>
>>>>>> -- Ashish
>>>>>>
>>>>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann
>>>>>> <trohrm...@apache.org> wrote:
>>>>>> Hi,
>>>>>>
>>>>>> this sounds like a serious regression wrt Flink 1.3.2 and we should
>>>>>> definitely find out what's causing this problem. Given from what I see in
>>>>>> the logs, the following happens:
>>>>>>
>>>>>> For some time the JobManager seems to no longer receive heartbeats
>>>>>> from the TaskManager. This could be, for example, due to long GC pauses 
>>>>>> or
>>>>>> heavy load which starves the ActorSystem's threads which are responsible
>>>>>> for sending the heartbeats. Due to this, the TM's ActorSystem is
>>>>>> quarantined which effectively renders them useless because the JM will
>>>>>> henceforth ignore all messages from these systems. The only way to 
>>>>>> resolve
>>>>>> this problem is to restart the ActorSystem. By
>>>>>> setting taskmanager.exit-on-fatal-akka-error to true in
>>>>>> flink-conf.yaml, a quarantined TM will shut down. If you run the Flink
>>>>>> cluster on Yarn, then a new substitute TM will be started if you have 
>>>>>> still
>>>>>> some container restarts left. That way, the system should be able to
>>>>>> recover.
>>>>>>
>>>>>> Additionally you could try to play around
>>>>>> with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause
>>>>>> which control the heartbeat interval and the acceptable pause. By
>>>>>> increasing the latter, the system should tolerate longer GC pauses and
>>>>>> period of high load.
>>>>>>
>>>>>> However, this only addresses the symptoms of the problem and I'd like
>>>>>> to find out what's causing the problem. In order to further debug the
>>>>>> problem, it would be really helpful to obtain the logs of the JobManager
>>>>>> and the TaskManagers on DEBUG log level and with 
>>>>>> taskmanager.debug.memory.startLogThread
>>>>>> set to true. Additionally it would be interesting to see whats happening 
>>>>>> on
>>>>>> the TaskManagers when you observe high load. So obtaining a profiler dump
>>>>>> via VisualVM would be great. And last but not least, it also helps to 
>>>>>> learn
>>>>>> more about the job you're running. What kind of connectors is it using? 
>>>>>> Are
>>>>>> you using Flink's metric system? How is the Flink cluster deployed? Which
>>>>>> other libraries are you using in your job?
>>>>>>
>>>>>> Thanks a lot for your help!
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> I've seen a similar issue while running successive Flink SQL batches
>>>>>> on 1.4. In my case, the Job Manager would fail with the log output about
>>>>>> unreachability (with an additional statement about something going
>>>>>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
>>>>>> everything works perfectly, but we will try again soon on 1.4. When we 
>>>>>> do I
>>>>>> will post the actual log output.
>>>>>>
>>>>>> This was on YARN in AWS, with akka.ask.timeout = 60s.
>>>>>>
>>>>>> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <ashish...@yahoo.com
>>>>>> > wrote:
>>>>>>
>>>>>> I haven’t gotten much further with this. It doesn’t look like GC
>>>>>> related - at least GC counters were not that atrocious. However, my main
>>>>>> concern was once the load subsides why aren’t TM and JM connecting again?
>>>>>> That doesn’t look normal. I could definitely tell JM was listening on the
>>>>>> port and from logs it does appear TM is trying to message JM that is 
>>>>>> still
>>>>>> alive.
>>>>>>
>>>>>> Thanks, Ashish
>>>>>>
>>>>>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <
>>>>>> lassenederga...@gmail.com> wrote:
>>>>>>
>>>>>> Hi.
>>>>>>
>>>>>> Did you find a reason for the detaching ?
>>>>>> I sometimes see the same on our system running Flink 1.4 on dc/os. I
>>>>>> have enabled taskmanager.Debug.memory.start logthread for debugging.
>>>>>>
>>>>>> Med venlig hilsen / Best regards
>>>>>> Lasse Nedergaard
>>>>>>
>>>>>>
>>>>>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <
>>>>>> duckientru...@gmail.com>:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> You should enable and check your garbage collection log.
>>>>>>
>>>>>> We've encountered case where Task Manager disassociated due to long
>>>>>> GC pause.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Kien
>>>>>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> We have hit some load related issues and was wondering if any one has
>>>>>> some suggestions. We are noticing task managers and job managers being
>>>>>> detached from each other under load and never really sync up again. As a
>>>>>> result, Flink session shows 0 slots available for processing. Even 
>>>>>> though,
>>>>>> apps are configured to restart it isn't really helping as there are no
>>>>>> slots available to run the apps.
>>>>>>
>>>>>>
>>>>>> Here are excerpt from logs that seemed relevant. (I am trimming out
>>>>>> rest of the logs for brevity)
>>>>>>
>>>>>> *Job Manager:*
>>>>>> 2018-01-19 12:38:00,423 INFO  org.apache.flink.runtime.jobma
>>>>>> nager.JobManager                -  Starting JobManager (Version: 1.4.0,
>>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>>>>
>>>>>> 2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobma
>>>>>> nager.JobManager                -  Maximum heap size: 16384 MiBytes
>>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>>> nager.JobManager                -  Hadoop version: 2.6.5
>>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>>> nager.JobManager                -  JVM Options:
>>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>>> nager.JobManager                -     -Xms16384m
>>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>>> nager.JobManager                -     -Xmx16384m
>>>>>> 2018-01-19 12:38:00,795 INFO  org.apache.flink.runtime.jobma
>>>>>> nager.JobManager                -     -XX:+UseG1GC
>>>>>>
>>>>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration
>>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>>> jobmanager.rpc.port, 6123
>>>>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration
>>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>>> jobmanager.heap.mb, 16384
>>>>>>
>>>>>>
>>>>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher
>>>>>>                        - Detected unreachable: 
>>>>>> [akka.tcp://flink@<jm-host>:37
>>>>>> 840]
>>>>>> 2018-01-19 12:53:34,676 INFO  org.apache.flink.runtime.jobma
>>>>>> nager.JobManager                - Task manager 
>>>>>> akka.tcp://flink@<jm-host>:378
>>>>>> 40/user/taskmanager terminated.
>>>>>>
>>>>>> -- So once Flink session boots up, we are hitting it with pretty
>>>>>> heavy load, which typically results in the WARN above
>>>>>>
>>>>>> *Task Manager:*
>>>>>> 2018-01-19 12:38:01,002 INFO  org.apache.flink.runtime.taskm
>>>>>> anager.TaskManager              -  Starting TaskManager (Version: 1.4.0,
>>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>> anager.TaskManager              -  Hadoop version: 2.6.5
>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>> anager.TaskManager              -  JVM Options:
>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>> anager.TaskManager              -     -Xms16384M
>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>> anager.TaskManager              -     -Xmx16384M
>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>> anager.TaskManager              -     -XX:MaxDirectMemorySize=83886 07T
>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>> anager.TaskManager              -     -XX:+UseG1GC
>>>>>>
>>>>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration
>>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>>> jobmanager.rpc.port, 6123
>>>>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration
>>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>>> jobmanager.heap.mb, 16384
>>>>>>
>>>>>>
>>>>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher
>>>>>>                        - Detected unreachable: 
>>>>>> [akka.tcp://flink@<jm-host>:61
>>>>>> 23]
>>>>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting
>>>>>>                       - Quarantined address 
>>>>>> [akka.tcp://flink@<jm-host>:61
>>>>>> 23] is still unreachable or has not been restarted. Keeping it
>>>>>> quarantined.
>>>>>> 018-01-19 12:54:48,774 WARN  akka.remote.Remoting
>>>>>>                       - Tried to associate with unreachable remote 
>>>>>> address [akka.tcp://flink@<tm-host>:61
>>>>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>>>>> will be delivered to dead letters. Reason: [The remote system has a UID
>>>>>> that has been quarantined. Association aborted.]
>>>>>> 2018-01-19 12:54:48,833 WARN  akka.remote.Remoting
>>>>>>                       - Tried to associate with unreachable remote 
>>>>>> address [akka.tcp://flink@<tm-host>:61
>>>>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>>>>> will be delivered to dead letters. Reason: [The remote system has
>>>>>> quarantined this system. No further associations to the remote system are
>>>>>> possible until this system is restarted.]
>>>>>> <bunch of ERRORs on operations not shutdown properly - assuming
>>>>>> because JM is unreachable>
>>>>>>
>>>>>> 2018-01-19 12:56:51,244 INFO  org.apache.flink.runtime.taskm
>>>>>> anager.TaskManager              - Trying to register at JobManager 
>>>>>> akka.tcp://flink@<jm-host>:612
>>>>>> 3/user/jobmanager (attempt 10, timeout: 30000 milliseconds)
>>>>>> 2018-01-19 12:56:51,253 WARN  akka.remote.Remoting
>>>>>>                       - Tried to associate with unreachable remote 
>>>>>> address [akka.tcp://flink@<jm-host>:61
>>>>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>>>>> will be delivered to dead letters. Reason: [The remote system has
>>>>>> quarantined this system. No further associations to the remote system are
>>>>>> possible until this system is restarted.]
>>>>>>
>>>>>> So bottom line is, JM and TM couldn't communicate under load, which
>>>>>> is obviously not good. I tried to bump up akka.tcp.timeout as well but it
>>>>>> didnt help either. So my question here is after all processing is halted
>>>>>> and there is no new data being picked up, shouldn't this environment
>>>>>> self-heal? Any other things I can be looking at other than extending
>>>>>> timeouts?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Ashish
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to