Thank you very much for your analysis. When I said there was no memory leak - I meant that from the specific TaskManager I monitored in real-time using JProfiler. Unfortunately, this problem occurs only in 1 of the TaskManager and you cannot anticipate which. So when you pick a TM to profile at random - everything looks fine.
I'm running the job again with Java FlightRecorder now, and I hope I'll find the reason for the memory leak. Thanks! On Thu, Jul 2, 2020 at 3:42 PM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Thanks, Ori > > From the log, it looks like there IS a memory leak. > > At 10:12:53 there was the last "successfull" gc when 13Gb freed in > 0.4653809 secs: > [Eden: 17336.0M(17336.0M)->0.0B(2544.0M) Survivors: 40960.0K->2176.0M > Heap: 23280.3M(28960.0M)->10047.0M(28960.0M)] > > Then the heap grew from 10G to 28G with GC not being able to free up > enough space: > [Eden: 2544.0M(2544.0M)->0.0B(856.0M) Survivors: 2176.0M->592.0M Heap: > 12591.0M(28960.0M)->11247.0M(28960.0M)] > [Eden: 856.0M(856.0M)->0.0B(1264.0M) Survivors: 592.0M->184.0M Heap: > 12103.0M(28960.0M)->11655.0M(28960.0M)] > [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap: > 12929.0M(28960.0M)->12467.0M(28960.0M)] > ... ... > [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap: > 28042.6M(28960.0M)->27220.6M(28960.0M)] > [Eden: 1264.0M(1264.0M)->0.0B(1264.0M) Survivors: 184.0M->184.0M Heap: > 28494.5M(28960.0M)->28720.6M(28960.0M)] > [Eden: 224.0M(1264.0M)->0.0B(1448.0M) Survivors: 184.0M->0.0B Heap: > 28944.6M(28960.0M)->28944.6M(28960.0M)] > > Until 10:15:12 when GC freed almost 4G - but it took 51 seconds and > heartbeat timed out: > 2020-07-01T10:15:12.869+0000: [Full GC (Allocation Failure) > 28944M->26018M(28960M), 51.5256128 secs] > [Eden: 0.0B(1448.0M)->0.0B(1448.0M) Survivors: 0.0B->0.0B Heap: > 28944.6M(28960.0M)->26018.9M(28960.0M)], [Metaspace: > 113556K->112729K(1150976K)] > [Times: user=91.08 sys=0.06, real=51.53 secs] > 2020-07-01T10:16:04.395+0000: [GC concurrent-mark-abort] > 10:16:04.398 [flink-akka.actor.default-dispatcher-21] INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor - The heartbeat of > JobManager with id bc59ba6a > > No substantial amount memory was freed after that. > > If this memory usage pattern is expected, I'd suggest to: > 1. increase heap size > 2. play with PrintStringDeduplicationStatistics and UseStringDeduplication > flags - probably string deduplication is making G1 slower then CMS > > Regards, > Roman > > > On Thu, Jul 2, 2020 at 10:11 AM Ori Popowski <ori....@gmail.com> wrote: > >> Hi, >> >> I'd be happy to :) Attached is a TaskManager log which timed out. >> >> >> Thanks! >> >> On Thu, Jul 2, 2020 at 4:21 AM Xintong Song <tonysong...@gmail.com> >> wrote: >> >>> Maybe you can share the log and gc-log of the problematic TaskManager? >>> See if we can find any clue. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <ori....@gmail.com> wrote: >>> >>>> I've found out that sometimes one of my TaskManagers experiences a GC >>>> pause of 40-50 seconds and I have no idea why. >>>> I profiled one of the machines using JProfiler and everything looks >>>> fine. No memory leaks, memory is low. >>>> However, I cannot anticipate which of the machines will get the 40-50 >>>> seconds pause and I also cannot profile all of them all the time. >>>> >>>> Any suggestions? >>>> >>>> On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <tonysong...@gmail.com> >>>> wrote: >>>> >>>>> In Flink 1.10, there's a huge change in the memory management compared >>>>> to previous versions. This could be related to your observations, because >>>>> with the same configurations, it is possible that there's less JVM heap >>>>> space (with more off-heap memory). Please take a look at this migration >>>>> guide [1]. >>>>> >>>>> Thank you~ >>>>> >>>>> Xintong Song >>>>> >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html >>>>> >>>>> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <ori....@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks for the suggestions! >>>>>> >>>>>> > i recently tried 1.10 and see this error frequently. and i dont >>>>>> have the same issue when running with 1.9.1 >>>>>> I did downgrade to Flink 1.9 and there's certainly no change in the >>>>>> occurrences in the heartbeat timeout >>>>>> >>>>>> >>>>>> > >>>>>> >>>>>> - Probably the most straightforward way is to try increasing the >>>>>> timeout to see if that helps. You can leverage the configuration >>>>>> option >>>>>> `heartbeat.timeout`[1]. The default is 50s. >>>>>> - It might be helpful to share your configuration setups (e.g., >>>>>> the TM resources, JVM parameters, timeout, etc.). Maybe the easiest >>>>>> way is >>>>>> to share the beginning part of your JM/TM logs, including the JVM >>>>>> parameters and all the loaded configurations. >>>>>> - You may want to look into the GC logs in addition to the >>>>>> metrics. In case of a CMS GC stop-the-world, you may not be able to >>>>>> see the >>>>>> most recent metrics due to the process not responding to the metric >>>>>> querying services. >>>>>> - You may also look into the status of the JM process. If JM is >>>>>> under significant GC pressure, it could also happen that the heartbeat >>>>>> message from TM is not timely handled before the timeout check. >>>>>> - Is there any metrics monitoring the network condition between >>>>>> the JM and timeouted TM? Possibly any jitters? >>>>>> >>>>>> >>>>>> Weirdly enough, I did manage to find a problem with the timed out >>>>>> TaskManagers, which slipped away the last time I checked: The timed out >>>>>> TaskManager is always the one with the max. GC time (young generation). I >>>>>> see it only now that I run with G1GC, but with the previous GC it wasn't >>>>>> the case. >>>>>> >>>>>> Does anyone know what can cause high GC time and how to mitigate this? >>>>>> >>>>>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <tonysong...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Ori, >>>>>>> >>>>>>> Here are some suggestions from my side. >>>>>>> >>>>>>> - Probably the most straightforward way is to try increasing the >>>>>>> timeout to see if that helps. You can leverage the configuration >>>>>>> option >>>>>>> `heartbeat.timeout`[1]. The default is 50s. >>>>>>> - It might be helpful to share your configuration setups (e.g., >>>>>>> the TM resources, JVM parameters, timeout, etc.). Maybe the easiest >>>>>>> way is >>>>>>> to share the beginning part of your JM/TM logs, including the JVM >>>>>>> parameters and all the loaded configurations. >>>>>>> - You may want to look into the GC logs in addition to the >>>>>>> metrics. In case of a CMS GC stop-the-world, you may not be able to >>>>>>> see the >>>>>>> most recent metrics due to the process not responding to the metric >>>>>>> querying services. >>>>>>> - You may also look into the status of the JM process. If JM is >>>>>>> under significant GC pressure, it could also happen that the >>>>>>> heartbeat >>>>>>> message from TM is not timely handled before the timeout check. >>>>>>> - Is there any metrics monitoring the network condition between >>>>>>> the JM and timeouted TM? Possibly any jitters? >>>>>>> >>>>>>> >>>>>>> Thank you~ >>>>>>> >>>>>>> Xintong Song >>>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout >>>>>>> >>>>>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <ori....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189 >>>>>>>> partitions and I have parallelism of 189. >>>>>>>> >>>>>>>> Currently running with RocksDB, with checkpointing disabled. My >>>>>>>> state size is appx. 500gb. >>>>>>>> >>>>>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors >>>>>>>> with no apparent reason. >>>>>>>> >>>>>>>> I check the container that gets the timeout for GC pauses, heap >>>>>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network >>>>>>>> load, total out-records, total in-records, backpressure, and >>>>>>>> everything I >>>>>>>> can think of. But all those metrics show that there's nothing unusual, >>>>>>>> and >>>>>>>> it has around average values for all those metrics. There are a lot of >>>>>>>> other containers which score higher. >>>>>>>> >>>>>>>> All the metrics are very low because every TaskManager runs on a >>>>>>>> r5.2xlarge machine alone. >>>>>>>> >>>>>>>> I'm trying to debug this for days and I cannot find any explanation >>>>>>>> for it. >>>>>>>> >>>>>>>> Can someone explain why it's happening? >>>>>>>> >>>>>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager >>>>>>>> with id container_1593074931633_0011_01_000127 timed out. >>>>>>>> at org.apache.flink.runtime.jobmaster. >>>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout( >>>>>>>> JobMaster.java:1147) >>>>>>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run( >>>>>>>> HeartbeatMonitorImpl.java:109) >>>>>>>> at java.util.concurrent.Executors$RunnableAdapter.call( >>>>>>>> Executors.java:511) >>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>> .handleRunAsync(AkkaRpcActor.java:397) >>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>> .handleRpcMessage(AkkaRpcActor.java:190) >>>>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >>>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>>>>> .handleMessage(AkkaRpcActor.java:152) >>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26 >>>>>>>> ) >>>>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21 >>>>>>>> ) >>>>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction >>>>>>>> .scala:123) >>>>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements >>>>>>>> .scala:21) >>>>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction >>>>>>>> .scala:170) >>>>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction >>>>>>>> .scala:171) >>>>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction >>>>>>>> .scala:171) >>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala: >>>>>>>> 225) >>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask >>>>>>>> .java:260) >>>>>>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask( >>>>>>>> ForkJoinPool.java:1339) >>>>>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool >>>>>>>> .java:1979) >>>>>>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run( >>>>>>>> ForkJoinWorkerThread.java:107) >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>