Hi!

Since we have moved to the new execution mode with Flink 1.7.0 we have
observed some pretty bad stability issues with the Yarn execution.

It's pretty hard to understand what's going on so sorry for the vague
description but here is what seems to happen:

In some cases when a bigger job fails (lets say 30 taskmanagers, 10 slots
each) and the job tries to recover we can observe taskmanagers start to
fail.

The errors usually look like this:

20181220T141057.132+0100  INFO The heartbeat of TaskManager with id
container_e15_1542798679751_0033_01_000021 timed out.
[org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run()
@ 1137]
20181220T141057.133+0100  INFO Closing TaskExecutor connection
container_e15_1542798679751_0033_01_000021 because: The heartbeat of
TaskManager with id container_e15_1542798679751_0033_01_000021  timed
out.  
[org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection()
@ 822]
20181220T141057.135+0100  INFO Execute processors -> (Filter config
stream -> (Filter Failures, Flat Map), Filter BEA) (168/180)
(3e9c164e4c0594f75c758624815265f1) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The assigned slot
container_e15_1542798679751_0033_01_000021_0 was removed.
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:893)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:863)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:1058)
        at 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:385)
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:825)
        at 
org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener$1.run(ResourceManager.java:1139)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [org.apache.flink.runtime.executiongraph.Execution.transitionState() @ 1342]


The job then goes in a restart loop, where taskmanagers come and go, the UI
sometimes displays more than 30 taskmanagers and some extra slots. I have
in some instances seen "GC overhead limit exceeded" during the recovery
which is very strange.

I suspect there might be something strange happening, maybe some broken
logic in the slot allocations or some memory leak.

Has anyone observed anything similar so far?
Seems to only affect some of our larger jobs. This hasn't been a problem in
the previous Flink releases where we always used the "legacy" execution
mode.

Thank you!
Gyula

Reply via email to