I've found out that sometimes one of my TaskManagers experiences a GC pause
of 40-50 seconds and I have no idea why.
I profiled one of the machines using JProfiler and everything looks fine.
No memory leaks, memory is low.
However, I cannot anticipate which of the machines will get the 40-50
seconds pause and I also cannot profile all of them all the time.

Any suggestions?

On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <tonysong...@gmail.com> wrote:

> In Flink 1.10, there's a huge change in the memory management compared to
> previous versions. This could be related to your observations, because with
> the same configurations, it is possible that there's less JVM heap space
> (with more off-heap memory). Please take a look at this migration guide [1].
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>
> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <ori....@gmail.com> wrote:
>
>> Thanks for the suggestions!
>>
>> > i recently tried 1.10 and see this error frequently. and i dont have
>> the same issue when running with 1.9.1
>> I did downgrade to Flink 1.9 and there's certainly no change in the
>> occurrences in the heartbeat timeout
>>
>>
>> >
>>
>>    - Probably the most straightforward way is to try increasing the
>>    timeout to see if that helps. You can leverage the configuration option
>>    `heartbeat.timeout`[1]. The default is 50s.
>>    - It might be helpful to share your configuration setups (e.g., the
>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>    and all the loaded configurations.
>>    - You may want to look into the GC logs in addition to the metrics.
>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>    recent metrics due to the process not responding to the metric querying
>>    services.
>>    - You may also look into the status of the JM process. If JM is under
>>    significant GC pressure, it could also happen that the heartbeat message
>>    from TM is not timely handled before the timeout check.
>>    - Is there any metrics monitoring the network condition between the
>>    JM and timeouted TM? Possibly any jitters?
>>
>>
>> Weirdly enough, I did manage to find a problem with the timed out
>> TaskManagers, which slipped away the last time I checked: The timed out
>> TaskManager is always the one with the max. GC time (young generation). I
>> see it only now that I run with G1GC, but with the previous GC it wasn't
>> the case.
>>
>> Does anyone know what can cause high GC time and how to mitigate this?
>>
>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <tonysong...@gmail.com>
>> wrote:
>>
>>> Hi Ori,
>>>
>>> Here are some suggestions from my side.
>>>
>>>    - Probably the most straightforward way is to try increasing the
>>>    timeout to see if that helps. You can leverage the configuration option
>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>    - It might be helpful to share your configuration setups (e.g., the
>>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>>    and all the loaded configurations.
>>>    - You may want to look into the GC logs in addition to the metrics.
>>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>>    recent metrics due to the process not responding to the metric querying
>>>    services.
>>>    - You may also look into the status of the JM process. If JM is
>>>    under significant GC pressure, it could also happen that the heartbeat
>>>    message from TM is not timely handled before the timeout check.
>>>    - Is there any metrics monitoring the network condition between the
>>>    JM and timeouted TM? Possibly any jitters?
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>
>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <ori....@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>> partitions and I have parallelism of 189.
>>>>
>>>> Currently running with RocksDB, with checkpointing disabled. My state
>>>> size is appx. 500gb.
>>>>
>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with
>>>> no apparent reason.
>>>>
>>>> I check the container that gets the timeout for GC pauses, heap memory,
>>>> direct memory, mapped memory, offheap memory, CPU load, network load, total
>>>> out-records, total in-records, backpressure, and everything I can think of.
>>>> But all those metrics show that there's nothing unusual, and it has around
>>>> average values for all those metrics. There are a lot of other containers
>>>> which score higher.
>>>>
>>>> All the metrics are very low because every TaskManager runs on a
>>>> r5.2xlarge machine alone.
>>>>
>>>> I'm trying to debug this for days and I cannot find any explanation for
>>>> it.
>>>>
>>>> Can someone explain why it's happening?
>>>>
>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
>>>> id container_1593074931633_0011_01_000127 timed out.
>>>>     at org.apache.flink.runtime.jobmaster.
>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
>>>> .java:1147)
>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>> HeartbeatMonitorImpl.java:109)
>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>>>> .java:511)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>> AkkaRpcActor.java:397)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>>>> AkkaRpcActor.java:190)
>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>> AkkaRpcActor.java:152)
>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:
>>>> 123)
>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:
>>>> 21)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>> 170)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>> 171)
>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>> 171)
>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260
>>>> )
>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>> ForkJoinPool.java:1339)
>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:
>>>> 1979)
>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>> ForkJoinWorkerThread.java:107)
>>>>
>>>> Thanks
>>>>
>>>

Reply via email to