Thanks for the context Nicolaus. We are using S3 instead of HDFS. Best regards Rainie
On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner < nicolaus.weid...@ververica.com> wrote: > Hi Rainie, > > I found a similar issue on stackoverflow, though quite different > stacktrace: > https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination > Do you replicate data on multiple hdfs nodes like suggested in the answer > there? > > Best, > Nico > > On Wed, Aug 4, 2021 at 9:24 AM Rainie Li <raini...@pinterest.com> wrote: > >> Thanks Till. >> We terminated one of the worker nodes. >> We enabled HA by using Zookeeper. >> Sure, we will try upgrade job to newer version. >> >> Best regards >> Rainie >> >> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Rainie, >>> >>> It looks to me as if Yarn is causing this problem. Which Yarn node are >>> you terminating? Have you configured your Yarn cluster to be highly >>> available in case you are terminating the ResourceManager? >>> >>> Flink should retry the operation of starting a new container in case it >>> fails. If this is not the case, then please upgrade to one of the actively >>> maintained Flink versions (1.12 or 1.13) and try whether it works with this >>> version. >>> >>> Cheers, >>> Till >>> >>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <raini...@pinterest.com> wrote: >>> >>>> Hi Flink Community, >>>> >>>> My flink application is running version 1.9 and it failed to recover >>>> (application was running but checkpoint failed and job stopped to process >>>> data) during hadoop yarn node termination. >>>> >>>> *Here is job manager log error:* >>>> *2021-07-26 18:02:58,605 INFO >>>> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception >>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB over >>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020 >>>> <http://10.1.185.175:8020>. Trying to fail over immediately.* >>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>> Operation category READ is not supported in state standby* >>>> at >>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87) >>>> at >>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774) >>>> at >>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313) >>>> at >>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856) >>>> at >>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006) >>>> at >>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843) >>>> at >>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) >>>> at >>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) >>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) >>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) >>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) >>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045) >>>> >>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476) >>>> at org.apache.hadoop.ipc.Client.call(Client.java:1407) >>>> at >>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) >>>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source) >>>> at >>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) >>>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) >>>> at >>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >>>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source) >>>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116) >>>> at >>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) >>>> at >>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) >>>> at >>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) >>>> at >>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) >>>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258) >>>> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490) >>>> at >>>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613) >>>> at >>>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415) >>>> at java.lang.Iterable.forEach(Iterable.java:75) >>>> at >>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) >>>> at >>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> *Here is error from task manager:* >>>> *2021-07-26 18:01:15,313 ERROR >>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - >>>> Encountered error while consuming partitions* >>>> java.io.IOException: Connection reset by peer >>>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) >>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) >>>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) >>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192) >>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> *2021-07-26 18:01:15,337 WARN >>>> org.apache.flink.streaming.runtime.tasks.StreamTask - Error >>>> while canceling task.* >>>> java.lang.Exception: >>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: >>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818) >>>> at >>>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528) >>>> at >>>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434) >>>> ... 1 more >>>> >>>> Any idea what could be the root cause and how to fix it? >>>> >>>> Thanks >>>> Best regards >>>> Rainie >>>> >>>