Thank you very much for your analysis.

When I said there was no memory leak - I meant that from the specific
TaskManager I monitored in real-time using JProfiler.
Unfortunately, this problem occurs only in 1 of the TaskManager and you
cannot anticipate which. So when you pick a TM to profile at random -
everything looks fine.

I'm running the job again with Java FlightRecorder now, and I hope I'll
find the reason for the memory leak.

Thanks!

On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Thanks, Ori
>
> From the log, it looks like there IS a memory leak.
>
> At 10:12:53 there was the last "successfull" gc when 13Gb freed in
> 0.4653809 secs:
> [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M
> Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)]
>
> Then the heap grew from 10G to 28G with GC not being able to free up
> enough space:
> [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap:
> 12591.0M(28960.0M)->11247.0M(28960.0M)]
> [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap:
> 12103.0M(28960.0M)->11655.0M(28960.0M)]
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 12929.0M(28960.0M)->12467.0M(28960.0M)]
> ... ...
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 28042.6M(28960.0M)->27220.6M(28960.0M)]
> [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap:
> 28494.5M(28960.0M)->28720.6M(28960.0M)]
> [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap:
> 28944.6M(28960.0M)->28944.6M(28960.0M)]
>
> Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and
> heartbeat timed out:
> 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure)
>  28944M->26018M(28960M), 51.5256128 secs]
>   [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap:
> 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace:
> 113556K->112729K(1150976K)]
>   [Times: user=91.08 sys=0.06, real=51.53 secs]
> 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort]
> 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor  - The heartbeat of
> JobManager with id bc59ba6a
>
> No substantial amount memory was freed after that.
>
> If this memory usage pattern is expected, I'd suggest to:
> 1. increase heap size
> 2. play with PrintStringDeduplicationStatistics and UseStringDeduplication
> flags - probably string deduplication is making G1 slower then CMS
>
> Regards,
> Roman
>
>
> On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <ori....@gmail.com> wrote:
>
>> Hi,
>>
>> I'd be happy to :) Attached is a TaskManager log which timed out.
>>
>>
>> Thanks!
>>
>> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <tonysong...@gmail.com>
>> wrote:
>>
>>> Maybe you can share the log and gc-log of the problematic TaskManager?
>>> See if we can find any clue.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <ori....@gmail.com> wrote:
>>>
>>>> I've found out that sometimes one of my TaskManagers experiences a GC
>>>> pause of 40-50 seconds and I have no idea why.
>>>> I profiled one of the machines using JProfiler and everything looks
>>>> fine. No memory leaks, memory is low.
>>>> However, I cannot anticipate which of the machines will get the 40-50
>>>> seconds pause and I also cannot profile all of them all the time.
>>>>
>>>> Any suggestions?
>>>>
>>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <tonysong...@gmail.com>
>>>> wrote:
>>>>
>>>>> In Flink 1.10, there's a huge change in the memory management compared
>>>>> to previous versions. This could be related to your observations, because
>>>>> with the same configurations, it is possible that there's less JVM heap
>>>>> space (with more off-heap memory). Please take a look at this migration
>>>>> guide [1].
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html
>>>>>
>>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <ori....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the suggestions!
>>>>>>
>>>>>> > i recently tried 1.10 and see this error frequently. and i dont
>>>>>> have the same issue when running with 1.9.1
>>>>>> I did downgrade to Flink 1.9 and there's certainly no change in the
>>>>>> occurrences in the heartbeat timeout
>>>>>>
>>>>>>
>>>>>> >
>>>>>>
>>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>>    timeout to see if that helps. You can leverage the configuration 
>>>>>> option
>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest 
>>>>>> way is
>>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>>    parameters and all the loaded configurations.
>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to 
>>>>>> see the
>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>    querying services.
>>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>>    under significant GC pressure, it could also happen that the heartbeat
>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>    - Is there any metrics monitoring the network condition between
>>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>>
>>>>>>
>>>>>> Weirdly enough, I did manage to find a problem with the timed out
>>>>>> TaskManagers, which slipped away the last time I checked: The timed out
>>>>>> TaskManager is always the one with the max. GC time (young generation). I
>>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't
>>>>>> the case.
>>>>>>
>>>>>> Does anyone know what can cause high GC time and how to mitigate this?
>>>>>>
>>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <tonysong...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Ori,
>>>>>>>
>>>>>>> Here are some suggestions from my side.
>>>>>>>
>>>>>>>    - Probably the most straightforward way is to try increasing the
>>>>>>>    timeout to see if that helps. You can leverage the configuration 
>>>>>>> option
>>>>>>>    `heartbeat.timeout`[1]. The default is 50s.
>>>>>>>    - It might be helpful to share your configuration setups (e.g.,
>>>>>>>    the TM resources, JVM parameters, timeout, etc.). Maybe the easiest 
>>>>>>> way is
>>>>>>>    to share the beginning part of your JM/TM logs, including the JVM
>>>>>>>    parameters and all the loaded configurations.
>>>>>>>    - You may want to look into the GC logs in addition to the
>>>>>>>    metrics. In case of a CMS GC stop-the-world, you may not be able to 
>>>>>>> see the
>>>>>>>    most recent metrics due to the process not responding to the metric
>>>>>>>    querying services.
>>>>>>>    - You may also look into the status of the JM process. If JM is
>>>>>>>    under significant GC pressure, it could also happen that the 
>>>>>>> heartbeat
>>>>>>>    message from TM is not timely handled before the timeout check.
>>>>>>>    - Is there any metrics monitoring the network condition between
>>>>>>>    the JM and timeouted TM? Possibly any jitters?
>>>>>>>
>>>>>>>
>>>>>>> Thank you~
>>>>>>>
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout
>>>>>>>
>>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <ori....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189
>>>>>>>> partitions and I have parallelism of 189.
>>>>>>>>
>>>>>>>> Currently running with RocksDB, with checkpointing disabled. My
>>>>>>>> state size is appx. 500gb.
>>>>>>>>
>>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors
>>>>>>>> with no apparent reason.
>>>>>>>>
>>>>>>>> I check the container that gets the timeout for GC pauses, heap
>>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network
>>>>>>>> load, total out-records, total in-records, backpressure, and 
>>>>>>>> everything I
>>>>>>>> can think of. But all those metrics show that there's nothing unusual, 
>>>>>>>> and
>>>>>>>> it has around average values for all those metrics. There are a lot of
>>>>>>>> other containers which score higher.
>>>>>>>>
>>>>>>>> All the metrics are very low because every TaskManager runs on a
>>>>>>>> r5.2xlarge machine alone.
>>>>>>>>
>>>>>>>> I'm trying to debug this for days and I cannot find any explanation
>>>>>>>> for it.
>>>>>>>>
>>>>>>>> Can someone explain why it's happening?
>>>>>>>>
>>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager
>>>>>>>> with id container_1593074931633_0011_01_000127 timed out.
>>>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(
>>>>>>>> JobMaster.java:1147)
>>>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>>>>> HeartbeatMonitorImpl.java:109)
>>>>>>>>     at java.util.concurrent.Executors$RunnableAdapter.call(
>>>>>>>> Executors.java:511)
>>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>> .handleRunAsync(AkkaRpcActor.java:397)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor
>>>>>>>> .handleMessage(AkkaRpcActor.java:152)
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26
>>>>>>>> )
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21
>>>>>>>> )
>>>>>>>>     at scala.PartialFunction$class.applyOrElse(PartialFunction
>>>>>>>> .scala:123)
>>>>>>>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements
>>>>>>>> .scala:21)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>> .scala:170)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>> .scala:171)
>>>>>>>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction
>>>>>>>> .scala:171)
>>>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:
>>>>>>>> 225)
>>>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask
>>>>>>>> .java:260)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(
>>>>>>>> ForkJoinPool.java:1339)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool
>>>>>>>> .java:1979)
>>>>>>>>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(
>>>>>>>> ForkJoinWorkerThread.java:107)
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>

Reply via email to