我帮你 cc 了对 runtime 更熟悉的 zhuzhu 同学。

Best,
Kurt


On Mon, Mar 9, 2020 at 6:44 PM lucas.wu <[email protected]> wrote:

> 没人回复大概是之前没人遇到过这种问题,所以下午看了flink的代码,终于有了点头绪。
> 原因分析:
> 这个异常的原因就是在task出现异常之后,它需要调用updateTaskExecutionState(TaskExecutionState
> taskExecutionState)这个rpc接口去通知flink jobmanager
>
> 去改变对应task的状态并且重启task。但是呢,taskExecutionState这个参数里面有个error属性,当我的的task打出来的错误栈太多的时候,在序列化的之后超过了
> rpc接口要求的最大数据大小(也就是maximum akka framesize),导致调用updateTaskExecutionState
> 这个rpc接口失败,jobmanager无法获知这个task已经fail
>
> 的状态,也无法重启。这就导致了一系列连锁反应,其中一个就是我的checkpoint一直失败,原因就是我的task其实已经释放了,但是jobmanger无法感知。
>
> 结论:
> 这个算不算flink的一个bug,对于task已经失效,但是无法通知到jobmanger,导致该task一直无法重启。
> 原始邮件
> 发件人:[email protected]
> 收件人:[email protected]
> 发送时间:2020年3月9日(周一) 11:06
> 主题:flink 长时间运行后出现报错
>
>
> 大家好: 我的flink程序的主要功能是从kafka消费数据,简单处理后,通过jdbc
> outputformat发往mysql,但是长时间运行后,报下面的错。请问是什么原因造成的,我增加参数就可以解决吗? 2020-03-08
> 06:10:30,480 WARN org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler -
> Could not create remote rpc invocation message. Failing rpc invocation
> because... java.io.IOException: The rpc invocation size 34500577 exceeds
> the maximum akka framesize. at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
> at com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at
> akka.actor.ActorCell.invoke(ActorCell.scala:496) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at
> akka.dispatch.Mailbox.run(Mailbox.scala:224) at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234) at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-03-08 06:10:30,480 ERROR
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Caught exception while
> executing runnable in main thread.
> java.lang.reflect.UndeclaredThrowableException at
> com.sun.proxy.$Proxy12.updateTaskExecutionState(Unknown Source) at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1428)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1458)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2200(TaskExecutor.java:154)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1757)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:130) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:527) at
> akka.actor.ActorCell.invoke(ActorCell.scala:496) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at
> akka.dispatch.Mailbox.run(Mailbox.scala:224) at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234) at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: The rpc invocation size 34500577 exceeds
> the maximum akka framesize. at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:271)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:200)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:129)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:78)
> ... 26 more

回复