Hi Ori, Here are some suggestions from my side.
- Probably the most straightforward way is to try increasing the timeout to see if that helps. You can leverage the configuration option `heartbeat.timeout`[1]. The default is 50s. - It might be helpful to share your configuration setups (e.g., the TM resources, JVM parameters, timeout, etc.). Maybe the easiest way is to share the beginning part of your JM/TM logs, including the JVM parameters and all the loaded configurations. - You may want to look into the GC logs in addition to the metrics. In case of a CMS GC stop-the-world, you may not be able to see the most recent metrics due to the process not responding to the metric querying services. - You may also look into the status of the JM process. If JM is under significant GC pressure, it could also happen that the heartbeat message from TM is not timely handled before the timeout check. - Is there any metrics monitoring the network condition between the JM and timeouted TM? Possibly any jitters? Thank you~ Xintong Song [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#heartbeat-timeout On Thu, Jun 25, 2020 at 11:15 PM Ori Popowski <ori....@gmail.com> wrote: > Hello, > > I'm running Flink 1.10 on EMR and reading from Kafka with 189 partitions > and I have parallelism of 189. > > Currently running with RocksDB, with checkpointing disabled. My state size > is appx. 500gb. > > I'm getting sporadic "Heartbeat of TaskManager timed out" errors with no > apparent reason. > > I check the container that gets the timeout for GC pauses, heap memory, > direct memory, mapped memory, offheap memory, CPU load, network load, total > out-records, total in-records, backpressure, and everything I can think of. > But all those metrics show that there's nothing unusual, and it has around > average values for all those metrics. There are a lot of other containers > which score higher. > > All the metrics are very low because every TaskManager runs on a > r5.2xlarge machine alone. > > I'm trying to debug this for days and I cannot find any explanation for it. > > Can someone explain why it's happening? > > java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id > container_1593074931633_0011_01_000127 timed out. > at org.apache.flink.runtime.jobmaster. > JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster > .java:1147) > at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run( > HeartbeatMonitorImpl.java:109) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: > 511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync( > AkkaRpcActor.java:397) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( > AkkaRpcActor.java:190) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor > .handleRpcMessage(FencedAkkaRpcActor.java:74) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( > AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool > .java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java: > 1979) > at akka.dispatch.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > Thanks >