Maybe you can share the log and gc-log of the problematic TaskManager? See if we can find any clue.
Thank you~ Xintong Song On Wed, Jul 1, 2020 at 8:11 PM Ori Popowski <ori....@gmail.com> wrote: > I've found out that sometimes one of my TaskManagers experiences a GC > pause of 40-50 seconds and I have no idea why. > I profiled one of the machines using JProfiler and everything looks fine. > No memory leaks, memory is low. > However, I cannot anticipate which of the machines will get the 40-50 > seconds pause and I also cannot profile all of them all the time. > > Any suggestions? > > On Mon, Jun 29, 2020 at 4:44 AM Xintong Song <tonysong...@gmail.com> > wrote: > >> In Flink 1.10, there's a huge change in the memory management compared to >> previous versions. This could be related to your observations, because with >> the same configurations, it is possible that there's less JVM heap space >> (with more off-heap memory). Please take a look at this migration guide [1]. >> >> Thank you~ >> >> Xintong Song >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html >> >> On Sun, Jun 28, 2020 at 10:12 PM Ori Popowski <ori....@gmail.com> wrote: >> >>> Thanks for the suggestions! >>> >>> > i recently tried 1.10 and see this error frequently. and i dont have >>> the same issue when running with 1.9.1 >>> I did downgrade to Flink 1.9 and there's certainly no change in the >>> occurrences in the heartbeat timeout >>> >>> >>> > >>> >>> - Probably the most straightforward way is to try increasing the >>> timeout to see if that helps. You can leverage the configuration option >>> `heartbeat.timeout`[1]. The default is 50s. >>> - It might be helpful to share your configuration setups (e.g., the >>> TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to >>> share the beginning part of your JM/TM logs, including the JVM parameters >>> and all the loaded configurations. >>> - You may want to look into the GC logs in addition to the metrics. >>> In case of a CMS GC stop-the-world, you may not be able to see the most >>> recent metrics due to the process not responding to the metric querying >>> services. >>> - You may also look into the status of the JM process. If JM is >>> under significant GC pressure, it could also happen that the heartbeat >>> message from TM is not timely handled before the timeout check. >>> - Is there any metrics monitoring the network condition between the >>> JM and timeouted TM? Possibly any jitters? >>> >>> >>> Weirdly enough, I did manage to find a problem with the timed out >>> TaskManagers, which slipped away the last time I checked: The timed out >>> TaskManager is always the one with the max. GC time (young generation). I >>> see it only now that I run with G1GC, but with the previous GC it wasn't >>> the case. >>> >>> Does anyone know what can cause high GC time and how to mitigate this? >>> >>> On Sun, Jun 28, 2020 at 5:04 AM Xintong Song <tonysong...@gmail.com> >>> wrote: >>> >>>> Hi Ori, >>>> >>>> Here are some suggestions from my side. >>>> >>>> - Probably the most straightforward way is to try increasing the >>>> timeout to see if that helps. You can leverage the configuration option >>>> `heartbeat.timeout`[1]. The default is 50s. >>>> - It might be helpful to share your configuration setups (e.g., the >>>> TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is >>>> to >>>> share the beginning part of your JM/TM logs, including the JVM >>>> parameters >>>> and all the loaded configurations. >>>> - You may want to look into the GC logs in addition to the metrics. >>>> In case of a CMS GC stop-the-world, you may not be able to see the most >>>> recent metrics due to the process not responding to the metric querying >>>> services. >>>> - You may also look into the status of the JM process. If JM is >>>> under significant GC pressure, it could also happen that the heartbeat >>>> message from TM is not timely handled before the timeout check. >>>> - Is there any metrics monitoring the network condition between the >>>> JM and timeouted TM? Possibly any jitters? >>>> >>>> >>>> Thank you~ >>>> >>>> Xintong Song >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout >>>> >>>> On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <ori....@gmail.com> >>>> wrote: >>>> >>>>> Hello, >>>>> >>>>> I'm running Flink 1.10 on EMR and reading from Kafka with 189 >>>>> partitions and I have parallelism of 189. >>>>> >>>>> Currently running with RocksDB, with checkpointing disabled. My state >>>>> size is appx. 500gb. >>>>> >>>>> I'm getting sporadic "Heartbeat of TaskManager timed out" errors with >>>>> no apparent reason. >>>>> >>>>> I check the container that gets the timeout for GC pauses, heap >>>>> memory, direct memory, mapped memory, offheap memory, CPU load, network >>>>> load, total out-records, total in-records, backpressure, and everything I >>>>> can think of. But all those metrics show that there's nothing unusual, and >>>>> it has around average values for all those metrics. There are a lot of >>>>> other containers which score higher. >>>>> >>>>> All the metrics are very low because every TaskManager runs on a >>>>> r5.2xlarge machine alone. >>>>> >>>>> I'm trying to debug this for days and I cannot find any explanation >>>>> for it. >>>>> >>>>> Can someone explain why it's happening? >>>>> >>>>> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with >>>>> id container_1593074931633_0011_01_000127 timed out. >>>>> at org.apache.flink.runtime.jobmaster. >>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout( >>>>> JobMaster.java:1147) >>>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run( >>>>> HeartbeatMonitorImpl.java:109) >>>>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors >>>>> .java:511) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync( >>>>> AkkaRpcActor.java:397) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor >>>>> .handleRpcMessage(AkkaRpcActor.java:190) >>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( >>>>> AkkaRpcActor.java:152) >>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala: >>>>> 123) >>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements >>>>> .scala:21) >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala: >>>>> 170) >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala: >>>>> 171) >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala: >>>>> 171) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java: >>>>> 260) >>>>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask( >>>>> ForkJoinPool.java:1339) >>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool >>>>> .java:1979) >>>>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run( >>>>> ForkJoinWorkerThread.java:107) >>>>> >>>>> Thanks >>>>> >>>>