Hi Vishal,

you should not need to configure anything else.

Cheers,
Till

On Sat, Jun 30, 2018 at 7:23 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> A clarification.. In 1.5 with custom heartbeats are there additional
> configurations we should be concerned about ?
>
> On Fri, May 25, 2018 at 10:17 AM, Steven Wu <stevenz...@gmail.com> wrote:
>
>> Till, thanks for the follow-up. looking forward to 1.5 :)
>>
>> On Fri, May 25, 2018 at 2:11 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Steven,
>>>
>>> we don't have `jobmanager.exit-on-fatal-akka-error` because then the JM
>>> would also be killed if a single TM gets quarantined. This is also not a
>>> desired behaviour.
>>>
>>> With Flink 1.5 the problem with quarantining should be gone since we
>>> don't rely anymore on Akka's death watch and instead use our own heartbeats.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, May 14, 2018 at 1:07 AM, Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>>> Till,
>>>>
>>>> thanks for the clarification. yes, that situation is undesirable either.
>>>>
>>>> In our case, restarting jobmanager could also recover the job from akk
>>>> association lock-out. it was actually the issue (high GC pause) on
>>>> jobmanager side that caused the akka failure.
>>>>
>>>> do we have sth like "jobmanager.exit-on-fatal-akka-error: true"? does
>>>> it make sense to terminate jobmanager in this case?
>>>>
>>>> Thanks,
>>>> Steven
>>>>
>>>> On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Steven,
>>>>>
>>>>> the reason why we did not turn on this feature per default was that in
>>>>> case of a true JM failure, all of the TMs will think that they got
>>>>> quarantined which triggers their shut down. Depending on how many 
>>>>> container
>>>>> restarts you have left on Yarn, for example, this can lead to a situation
>>>>> where Flink is not able to recover the job even though it needed to only
>>>>> restart the JM container.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <stevenz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Till,
>>>>>>
>>>>>> We ran into the same issue. It started with high GC pause that caused
>>>>>> jobmanager to lose zk conn and leadership and caused jobmanager to
>>>>>> quarantine taskmanager in akka. Once quarantined, akka association btw
>>>>>> jobmanager and taskmanager is locked forever.
>>>>>>
>>>>>> Your suggestion of " taskmanager.exit-on-fatal-akka-error: true"
>>>>>> worked. taskmanager exited and replacement taskmanager joined the cluster
>>>>>> afterwards. I am wondering why is this not defaulted to "true". Any
>>>>>> downside?
>>>>>>
>>>>>> Thanks,
>>>>>> Steven
>>>>>>
>>>>>> On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <ashish...@yahoo.com>
>>>>>> wrote:
>>>>>>
>>>>>>> @Jelmer, this is Till's las response on the issue.
>>>>>>>
>>>>>>> -- Ashish
>>>>>>>
>>>>>>> On Mon, Feb 5, 2018 at 5:56 AM, Till Rohrmann
>>>>>>> <trohrm...@apache.org> wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> this sounds like a serious regression wrt Flink 1.3.2 and we should
>>>>>>> definitely find out what's causing this problem. Given from what I see 
>>>>>>> in
>>>>>>> the logs, the following happens:
>>>>>>>
>>>>>>> For some time the JobManager seems to no longer receive heartbeats
>>>>>>> from the TaskManager. This could be, for example, due to long GC pauses 
>>>>>>> or
>>>>>>> heavy load which starves the ActorSystem's threads which are responsible
>>>>>>> for sending the heartbeats. Due to this, the TM's ActorSystem is
>>>>>>> quarantined which effectively renders them useless because the JM will
>>>>>>> henceforth ignore all messages from these systems. The only way to 
>>>>>>> resolve
>>>>>>> this problem is to restart the ActorSystem. By
>>>>>>> setting taskmanager.exit-on-fatal-akka-error to true in 
>>>>>>> flink-conf.yaml, a
>>>>>>> quarantined TM will shut down. If you run the Flink cluster on Yarn, 
>>>>>>> then a
>>>>>>> new substitute TM will be started if you have still some container 
>>>>>>> restarts
>>>>>>> left. That way, the system should be able to recover.
>>>>>>>
>>>>>>> Additionally you could try to play around
>>>>>>> with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause which
>>>>>>> control the heartbeat interval and the acceptable pause. By increasing 
>>>>>>> the
>>>>>>> latter, the system should tolerate longer GC pauses and period of high 
>>>>>>> load.
>>>>>>>
>>>>>>> However, this only addresses the symptoms of the problem and I'd
>>>>>>> like to find out what's causing the problem. In order to further debug 
>>>>>>> the
>>>>>>> problem, it would be really helpful to obtain the logs of the JobManager
>>>>>>> and the TaskManagers on DEBUG log level and
>>>>>>> with taskmanager.debug.memory.startLogThread set to true. Additionally 
>>>>>>> it
>>>>>>> would be interesting to see whats happening on the TaskManagers when you
>>>>>>> observe high load. So obtaining a profiler dump via VisualVM would be
>>>>>>> great. And last but not least, it also helps to learn more about the job
>>>>>>> you're running. What kind of connectors is it using? Are you using 
>>>>>>> Flink's
>>>>>>> metric system? How is the Flink cluster deployed? Which other libraries 
>>>>>>> are
>>>>>>> you using in your job?
>>>>>>>
>>>>>>> Thanks a lot for your help!
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <cre...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> I've seen a similar issue while running successive Flink SQL batches
>>>>>>> on 1.4. In my case, the Job Manager would fail with the log output about
>>>>>>> unreachability (with an additional statement about something going
>>>>>>> "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where
>>>>>>> everything works perfectly, but we will try again soon on 1.4. When we 
>>>>>>> do I
>>>>>>> will post the actual log output.
>>>>>>>
>>>>>>> This was on YARN in AWS, with akka.ask.timeout = 60s.
>>>>>>>
>>>>>>> On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <
>>>>>>> ashish...@yahoo.com> wrote:
>>>>>>>
>>>>>>> I haven’t gotten much further with this. It doesn’t look like GC
>>>>>>> related - at least GC counters were not that atrocious. However, my main
>>>>>>> concern was once the load subsides why aren’t TM and JM connecting 
>>>>>>> again?
>>>>>>> That doesn’t look normal. I could definitely tell JM was listening on 
>>>>>>> the
>>>>>>> port and from logs it does appear TM is trying to message JM that is 
>>>>>>> still
>>>>>>> alive.
>>>>>>>
>>>>>>> Thanks, Ashish
>>>>>>>
>>>>>>> On Jan 23, 2018, at 12:31 PM, Lasse Nedergaard <
>>>>>>> lassenederga...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi.
>>>>>>>
>>>>>>> Did you find a reason for the detaching ?
>>>>>>> I sometimes see the same on our system running Flink 1.4 on dc/os. I
>>>>>>> have enabled taskmanager.Debug.memory.start logthread for debugging.
>>>>>>>
>>>>>>> Med venlig hilsen / Best regards
>>>>>>> Lasse Nedergaard
>>>>>>>
>>>>>>>
>>>>>>> Den 20. jan. 2018 kl. 12.57 skrev Kien Truong <
>>>>>>> duckientru...@gmail.com>:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> You should enable and check your garbage collection log.
>>>>>>>
>>>>>>> We've encountered case where Task Manager disassociated due to long
>>>>>>> GC pause.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Kien
>>>>>>> On 1/20/2018 1:27 AM, ashish pok wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> We have hit some load related issues and was wondering if any one
>>>>>>> has some suggestions. We are noticing task managers and job managers 
>>>>>>> being
>>>>>>> detached from each other under load and never really sync up again. As a
>>>>>>> result, Flink session shows 0 slots available for processing. Even 
>>>>>>> though,
>>>>>>> apps are configured to restart it isn't really helping as there are no
>>>>>>> slots available to run the apps.
>>>>>>>
>>>>>>>
>>>>>>> Here are excerpt from logs that seemed relevant. (I am trimming out
>>>>>>> rest of the logs for brevity)
>>>>>>>
>>>>>>> *Job Manager:*
>>>>>>> 2018-01-19 12:38:00,423 INFO  org.apache.flink.runtime.jobma
>>>>>>> nager.JobManager                -  Starting JobManager (Version: 1.4.0,
>>>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>>>>>
>>>>>>> 2018-01-19 12:38:00,792 INFO  org.apache.flink.runtime.jobma
>>>>>>> nager.JobManager                -  Maximum heap size: 16384 MiBytes
>>>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>>>> nager.JobManager                -  Hadoop version: 2.6.5
>>>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>>>> nager.JobManager                -  JVM Options:
>>>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>>>> nager.JobManager                -     -Xms16384m
>>>>>>> 2018-01-19 12:38:00,794 INFO  org.apache.flink.runtime.jobma
>>>>>>> nager.JobManager                -     -Xmx16384m
>>>>>>> 2018-01-19 12:38:00,795 INFO  org.apache.flink.runtime.jobma
>>>>>>> nager.JobManager                -     -XX:+UseG1GC
>>>>>>>
>>>>>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration
>>>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>>>> jobmanager.rpc.port, 6123
>>>>>>> 2018-01-19 12:38:00,908 INFO  org.apache.flink.configuration
>>>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>>>> jobmanager.heap.mb, 16384
>>>>>>>
>>>>>>>
>>>>>>> 2018-01-19 12:53:34,671 WARN  akka.remote.RemoteWatcher
>>>>>>>                        - Detected unreachable: 
>>>>>>> [akka.tcp://flink@<jm-host>:37
>>>>>>> 840]
>>>>>>> 2018-01-19 12:53:34,676 INFO  org.apache.flink.runtime.jobma
>>>>>>> nager.JobManager                - Task manager 
>>>>>>> akka.tcp://flink@<jm-host>:378
>>>>>>> 40/user/taskmanager terminated.
>>>>>>>
>>>>>>> -- So once Flink session boots up, we are hitting it with pretty
>>>>>>> heavy load, which typically results in the WARN above
>>>>>>>
>>>>>>> *Task Manager:*
>>>>>>> 2018-01-19 12:38:01,002 INFO  org.apache.flink.runtime.taskm
>>>>>>> anager.TaskManager              -  Starting TaskManager (Version: 1.4.0,
>>>>>>> Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
>>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>>> anager.TaskManager              -  Hadoop version: 2.6.5
>>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>>> anager.TaskManager              -  JVM Options:
>>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>>> anager.TaskManager              -     -Xms16384M
>>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>>> anager.TaskManager              -     -Xmx16384M
>>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>>> anager.TaskManager              -     -XX:MaxDirectMemorySize=83886 07T
>>>>>>> 2018-01-19 12:38:01,367 INFO  org.apache.flink.runtime.taskm
>>>>>>> anager.TaskManager              -     -XX:+UseG1GC
>>>>>>>
>>>>>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration
>>>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>>>> jobmanager.rpc.port, 6123
>>>>>>> 2018-01-19 12:38:01,392 INFO  org.apache.flink.configuration
>>>>>>> .GlobalConfiguration            - Loading configuration property:
>>>>>>> jobmanager.heap.mb, 16384
>>>>>>>
>>>>>>>
>>>>>>> 2018-01-19 12:54:48,626 WARN  akka.remote.RemoteWatcher
>>>>>>>                        - Detected unreachable: 
>>>>>>> [akka.tcp://flink@<jm-host>:61
>>>>>>> 23]
>>>>>>> 2018-01-19 12:54:48,690 INFO  akka.remote.Remoting
>>>>>>>                         - Quarantined address 
>>>>>>> [akka.tcp://flink@<jm-host>:61
>>>>>>> 23] is still unreachable or has not been restarted. Keeping it
>>>>>>> quarantined.
>>>>>>> 018-01-19 12:54:48,774 WARN  akka.remote.Remoting
>>>>>>>                       - Tried to associate with unreachable remote 
>>>>>>> address [akka.tcp://flink@<tm-host>:61
>>>>>>> 23]. Address is now gated for 5000 ms, all messages to this address
>>>>>>> will be delivered to dead letters. Reason: [The remote system has a UID
>>>>>>> that has been quarantined. Association aborted.]
>>>>>>> 2018-01-19 12:54:48,833 WARN  akka.remote.Remoting
>>>>>>>                         - Tried to associate with unreachable remote
>>>>>>> address [akka.tcp://flink@<tm-host>:61 23]. Address is now gated
>>>>>>> for 5000 ms, all messages to this address will be delivered to dead
>>>>>>> letters. Reason: [The remote system has quarantined this system. No 
>>>>>>> further
>>>>>>> associations to the remote system are possible until this system is
>>>>>>> restarted.]
>>>>>>> <bunch of ERRORs on operations not shutdown properly - assuming
>>>>>>> because JM is unreachable>
>>>>>>>
>>>>>>> 2018-01-19 12:56:51,244 INFO  org.apache.flink.runtime.taskm
>>>>>>> anager.TaskManager              - Trying to register at JobManager 
>>>>>>> akka.tcp://flink@<jm-host>:612
>>>>>>> 3/user/jobmanager (attempt 10, timeout: 30000 milliseconds)
>>>>>>> 2018-01-19 12:56:51,253 WARN  akka.remote.Remoting
>>>>>>>                         - Tried to associate with unreachable remote
>>>>>>> address [akka.tcp://flink@<jm-host>:61 23]. Address is now gated
>>>>>>> for 5000 ms, all messages to this address will be delivered to dead
>>>>>>> letters. Reason: [The remote system has quarantined this system. No 
>>>>>>> further
>>>>>>> associations to the remote system are possible until this system is
>>>>>>> restarted.]
>>>>>>>
>>>>>>> So bottom line is, JM and TM couldn't communicate under load, which
>>>>>>> is obviously not good. I tried to bump up akka.tcp.timeout as well but 
>>>>>>> it
>>>>>>> didnt help either. So my question here is after all processing is halted
>>>>>>> and there is no new data being picked up, shouldn't this environment
>>>>>>> self-heal? Any other things I can be looking at other than extending
>>>>>>> timeouts?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Ashish
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to