I'd go with the network congestion theory for the time being; then the
only remedy is throttling the download of said list, or somehow reducing
the size of it significantly
.
What the task thread is doing doesn't matter in regards to HA; it may
cause checkpoints to time out, but should have no other effects. (unless
it magically consumes all CPU resources of system)
If you block in any function, then you're just blocking the task thread;
nothing else.
On 11/5/2020 10:40 PM, Theo Diefenthal wrote:
Hi there,
I have a stream where I reload a huge list from time to time. I know
there are various Side-Input patterns, but none of them seem to be
perfect so I stuck with an easy approach: I use a Guava Cache and if
it expires and a new element comes in, processing of the element is
blocked up until the new list is loaded.
That approach runs in production for a while now and it works fine, as
the cache has a mechanism to reload the list only on a real change.
Now today, the list changed from a few hundred MB to multiple GB at a
time where the network in general was a bit congested already. One
TaskManager needed round about 4minutes to load the list, but after
30seconds, it reported it lost connection to zookeeper and had thus no
more information about the leading jobmanager, leading to a crashing
loop. That crash & restart loop continued for 30minutes up until the
list was rolled back and was then successfully loaded again.
Now my question:
* If processing of an element blocks, I understand that its also not
possible to perform checkpoints at that time, but I didn't expect
Zookeeper, Heartbeats or other threads of the taskmanager to timeout.
Was that just a coincidence of the network being congested or is that
something in the design of Flink that a long blocking call can lead to
crashes? (Other than X checkpoints timed out and following a
configured forced crash occured). Which threads can be blocked in
Flink during a map in a MapFunction?
* For this approach with kind of a cached reload, should I switch to
async IO or just put loading of the list in a background thread? In my
case, it's not really important that processing is blocked up until
the list is loaded. And in case of async IO: 99,999% of the events
would directly return and would thus not be async, it's always just a
single one triggering reload of the list, so it doesn't seem to be
perfectly suited here?
Im running on Flink 1.11 and heres the relevant excerpt from the log:
2020-11-05T09:41:40.933865+01:00 [WARN] Client session timed out, have
not heard from server in 33897ms for sessionid 0x374de97ba0afac9
2020-11-05T09:41:40.936488+01:00 [INFO] Client session timed out, have
not heard from server in 33897ms for sessionid 0x374de97ba0afac9,
closing socket connection and attempting reconnect
2020-11-05T09:41:41.042032+01:00 [INFO] State change: SUSPENDED
2020-11-05T09:41:41.168802+01:00 [WARN] Connection to ZooKeeper
suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-05T09:41:41.169276+01:00 [WARN] Connection to ZooKeeper
suspended. Can no longer retrieve the leader from ZooKeeper.
2020-11-05T09:41:41.169514+01:00 [INFO] Close ResourceManager
connection e4d1f9acca4ea3c5a793877467218452.
2020-11-05T09:41:41.169514+01:00 [INFO] JobManager for job
0dcfb212136daefbcbfe480c6a260261 with leader id
8440610cd5de998bd6c65f3717de42b8 lost leadership.
2020-11-05T09:41:41.185104+01:00 [INFO] Close JobManager connection
for job 0dcfb212136daefbcbfe480c6a260261.
2020-11-05T09:41:41.185354+01:00 [INFO] Attempting to fail task
externally .... (c596aafa324b152911cb53ab4e6d1cc2).
2020-11-05T09:41:41.187980+01:00 [WARN] ...
(c596aafa324b152911cb53ab4e6d1cc2) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for
0dcfb212136daefbcbfe480c6a260261 lost the leadership.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1415)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1300(TaskExecutor.java:173)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852)
at java.util.Optional.ifPresent(Optional.java:159)
at
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Job leader for job id
0dcfb212136daefbcbfe480c6a260261 lost leadership.
... 24 more
Best regards
Theo