We are developing a new feature for our Flink application that relies upon
joining multiple Kafka Streams and uses Flink State to handle joining
information asynchronously.  Recently as the volume of data has been
growing, we've been noticing a couple exceptions while trying to enable the
feature.

Error 1:

2022-02-22 09:49:00
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Error at remote task manager '10.121.184.7/10.121.184.7:43879'.
    at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.decodeMsg(CreditBasedPartitionRequestClientHandler.java:351)
    at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:240)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelRead(NettyMessageClientDecoderDelegate.java:112)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.io.network.partition.ProducerFailedException:
org.apache.flink.util.FlinkException: Disconnect from JobManager
responsible for 0efd8681eda64b072b72baef58722bc0.
    at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:285)
    at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:123)
    at 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.addCreditOrResumeConsumption(PartitionRequestQueue.java:172)
    at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:120)
    at 
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    ... 13 more
Caused by: org.apache.flink.util.FlinkException: Disconnect from
JobManager responsible for 0efd8681eda64b072b72baef58722bc0.
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1654)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1700(TaskExecutor.java:183)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2228)
    at java.util.Optional.ifPresent(Optional.java:159)
    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2226)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
    at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.lang.Exception: Job leader for job id
0efd8681eda64b072b72baef58722bc0 lost leadership.
    ... 28 more

This tends to be followed by a loop of errors similar to (Note: this
is from a different job because it's uncommon for us to capture the
first exception):






java.lang.Exception: Cannot deploy task Facility ID Enrichment Request
Kafka Output -> Sink: Facility ID Enrichment Request Kafka Sink -
analytics-pipeline-facility-id-enrichment-requests (35/36)
(23ea11c41e058c088114255c2eae3ccf) - TaskManager
(10.121.100.6:35993-7cdf23 @ 10.121.100.6 (dataPort=40595)) not
responding after a rpcTimeout of 10000 ms

    at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:611)

    at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)

    at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)

    at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)

    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)

    at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)

    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)

    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)

    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)

    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)

    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)

    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)

    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)

    at akka.actor.Actor.aroundReceive(Actor.scala:537)

    at akka.actor.Actor.aroundReceive$(Actor.scala:535)

    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)

    at akka.actor.ActorCell.invoke(ActorCell.scala:548)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)

    at akka.dispatch.Mailbox.run(Mailbox.scala:231)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)

    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Caused by: java.util.concurrent.TimeoutException: Invocation of
[RemoteRpcInvocation(null.submitTask(TaskDeploymentDescriptor,
JobMasterId, Time))] at recipient
[akka.tcp://flink@10.121.100.6:35993/user/rpc/taskmanager_0] timed
out.

    at 
org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)

    at 
org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)

    at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)

    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@10.121.100.6:35993/user/rpc/taskmanager_0#-1412463596]]
after [10000 ms]. Message of type
[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical
reason for `AskTimeoutException` is that the recipient actor didn't
send a reply.


It seems like the errors are similar to those discussed here:
- https://issues.apache.org/jira/browse/FLINK-14316
- https://cdmana.com/2020/11/20201116104527255b.html

When looking at the memory structure it looks like all memory is below 100%
except for managed memory.  We have 9.1GB of managed memory for each of our
6 task managers and I estimate that our total Flink State is 600GB.  Is it
okay for run with that little memory for that much State?


Thanks.
Jai

Reply via email to