Maybe you can share the log and gc-log of the problematic TaskManager? See
if we can find any clue.

Thank you~

Xintong Song



On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <ori....@gmail.com> wrote:

> I've found out that sometimes one of my TaskManagers experiences a GC
> pause of 40-50 seconds and I have no idea why.
> I profiled one of the machines using JProfiler and everything looks fine.
> No memory leaks, memory is low.
> However, I cannot anticipate which of the machines will get the 40-50
> seconds pause and I also cannot profile all of them all the time.
>
> Any suggestions?
>
> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <tonysong...@gmail.com>
> wrote:
>
>> In Flink 1.10, there's a huge change in the memory management compared to
>> previous versions. This could be related to your observations, because with
>> the same configurations, it is possible that there's less JVM heap space
>> (with more off-heap memory). Please take a look at this migration guide [1].
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>
>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <ori....@gmail.com> wrote:
>>
>>> Thanks for the suggestions!
>>>
>>> > i recently tried 1.10 and see this error frequently. and i dont have
>>> the same issue when running with 1.9.1
>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>> occurrences in the heartbeat timeout
>>>
>>>
>>> >
>>>
>>>    - Probably the most straightforward way is to try increasing the
>>>    timeout to see if that helps. You can leverage the configuration option
>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>    - It might be helpful to share your configuration setups (e.g., the
>>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to
>>>    share the beginning part of your JM/TM logs, including the JVM parameters
>>>    and all the loaded configurations.
>>>    - You may want to look into the GC logs in addition to the metrics.
>>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>>    recent metrics due to the process not responding to the metric querying
>>>    services.
>>>    - You may also look into the status of the JM process. If JM is
>>>    under significant GC pressure, it could also happen that the heartbeat
>>>    message from TM is not timely handled before the timeout check.
>>>    - Is there any metrics monitoring the network condition between the
>>>    JM and timeouted TM? Possibly any jitters?
>>>
>>>
>>> Weirdly enough, I did manage to find a problem with the timed out
>>> TaskManagers, which slipped away the last time I checked: The timed out
>>> TaskManager is always the one with the max. GC time (young generation). I
>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>> the case.
>>>
>>> Does anyone know what can cause high GC time and how to mitigate this?
>>>
>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <tonysong...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ori,
>>>>
>>>> Here are some suggestions from my side.
>>>>
>>>>    - Probably the most straightforward way is to try increasing the
>>>>    timeout to see if that helps. You can leverage the configuration option
>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>    - It might be helpful to share your configuration setups (e.g., the
>>>>    TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is 
>>>> to
>>>>    share the beginning part of your JM/TM logs, including the JVM 
>>>> parameters
>>>>    and all the loaded configurations.
>>>>    - You may want to look into the GC logs in addition to the metrics.
>>>>    In case of a CMS GC stop-the-world, you may not be able to see the most
>>>>    recent metrics due to the process not responding to the metric querying
>>>>    services.
>>>>    - You may also look into the status of the JM process. If JM is
>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>    message from TM is not timely handled before the timeout check.
>>>>    - Is there any metrics monitoring the network condition between the
>>>>    JM and timeouted TM? Possibly any jitters?
>>>>
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>
>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <ori....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>> partitions and I have parallelism of 189.
>>>>>
>>>>> Currently running with RocksDB, with checkpointing disabled. My state
>>>>> size is appx. 500gb.
>>>>>
>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with
>>>>> no apparent reason.
>>>>>
>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>> load, total out-records, total in-records, backpressure, and everything I
>>>>> can think of. But all those metrics show that there's nothing unusual, and
>>>>> it has around average values for all those metrics. There are a lot of
>>>>> other containers which score higher.
>>>>>
>>>>> All the metrics are very low because every TaskManager runs on a
>>>>> r5.2xlarge machine alone.
>>>>>
>>>>> I'm trying to debug this for days and I cannot find any explanation
>>>>> for it.
>>>>>
>>>>> Can someone explain why it's happening?
>>>>>
>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
>>>>> id container_1593074931633_0011_01_000127 timed out.
>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>> JobMaster.java:1147)
>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>> HeartbeatMonitorImpl.java:109)
>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors
>>>>> .java:511)
>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>>> AkkaRpcActor.java:397)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>>> AkkaRpcActor.java:152)
>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:
>>>>> 123)
>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>> .scala:21)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>>> 170)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>>> 171)
>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:
>>>>> 171)
>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:
>>>>> 260)
>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>> ForkJoinPool.java:1339)
>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>> .java:1979)
>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>>
>>>>> Thanks
>>>>>
>>>>

Reply via email to