Hi Felipe RocksDB is executed in task main thread which does take the role to respond to heart beat and RocksDB mainly use native memory which is decoupled from JVM heap to not bring any GC pressure. Thus, timeout should have no relationship with RocksDB in general if your task manager is really heartbeat timeout instead of crash to exit.
Try to increase the heartbeat timeout [1] and watch the GC detail logs to see anything weird. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-timeout Best Yun Tang ________________________________ From: Ori Popowski <ori....@gmail.com> Sent: Monday, June 29, 2020 17:44 Cc: user <user@flink.apache.org> Subject: Re: Timeout when using RockDB to handle large state in a stream app Hi there, I'm currently experiencing the exact same issue. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Heartbeat-of-TaskManager-timed-out-td36228.html I've found out that GC is causing the problem, but I still haven't managed to solve this. On Mon, Jun 29, 2020 at 12:39 PM Felipe Gutierrez <felipe.o.gutier...@gmail.com<mailto:felipe.o.gutier...@gmail.com>> wrote: Hi community, I am trying to run a stream application with large state in a standalone flink cluster [3]. I configured the RocksDB state backend and I increased the memory of the Job Manager and Task Manager. However, I am still getting the timeout message "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out.". I am using Flink 1.10.1 and here are the configurations that I changed on the flink-conf.yaml. For the "state.checkpoints.dir" I am still using the filesystem. I am not sure if I need to use HDFS here since I am testing only in one machine. jobmanager.heap.size: 12g taskmanager.memory.process.size: 8g state.backend: rocksdb state.checkpoints.dir: file:///tmp/flink/state In the stream application I am using RocksDB as well (full code [3]): StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/state", true)); I have some operators that hold a large state when the load a static table on their state. I use them in two aggregate operations [1] and [2]. [1] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L128 [2] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java#L199 [3] https://github.com/felipegutierrez/explore-flink/blob/acb4d4675f60c59f5c3de70c9e0ba82031205744/src/main/java/org/sense/flink/examples/stream/tpch/TPCHQuery03.java Here is my stack trace error: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1703) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1252) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1220) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:955) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManagerInternal(SlotPoolImpl.java:818) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.releaseTaskManager(SlotPoolImpl.java:777) at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:429) at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf<http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id cb1091d792f52ca4743f345790d87dd5 timed out. ... 26 more Thanks, Felipe -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com