Hi Nicolaus, I double checked again our hdfs config, it is setting 1 instead of 2. I will try the solution you provided.
Thanks again. Best regards Rainie On Wed, Aug 4, 2021 at 10:40 AM Rainie Li <raini...@pinterest.com> wrote: > Thanks for the context Nicolaus. > We are using S3 instead of HDFS. > > Best regards > Rainie > > On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner < > nicolaus.weid...@ververica.com> wrote: > >> Hi Rainie, >> >> I found a similar issue on stackoverflow, though quite different >> stacktrace: >> https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination >> Do you replicate data on multiple hdfs nodes like suggested in the answer >> there? >> >> Best, >> Nico >> >> On Wed, Aug 4, 2021 at 9:24 AM Rainie Li <raini...@pinterest.com> wrote: >> >>> Thanks Till. >>> We terminated one of the worker nodes. >>> We enabled HA by using Zookeeper. >>> Sure, we will try upgrade job to newer version. >>> >>> Best regards >>> Rainie >>> >>> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Rainie, >>>> >>>> It looks to me as if Yarn is causing this problem. Which Yarn node are >>>> you terminating? Have you configured your Yarn cluster to be highly >>>> available in case you are terminating the ResourceManager? >>>> >>>> Flink should retry the operation of starting a new container in case it >>>> fails. If this is not the case, then please upgrade to one of the actively >>>> maintained Flink versions (1.12 or 1.13) and try whether it works with this >>>> version. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <raini...@pinterest.com> >>>> wrote: >>>> >>>>> Hi Flink Community, >>>>> >>>>> My flink application is running version 1.9 and it failed to recover >>>>> (application was running but checkpoint failed and job stopped to process >>>>> data) during hadoop yarn node termination. >>>>> >>>>> *Here is job manager log error:* >>>>> *2021-07-26 18:02:58,605 INFO >>>>> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception >>>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB >>>>> over >>>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020 >>>>> <http://10.1.185.175:8020>. Trying to fail over immediately.* >>>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): >>>>> Operation category READ is not supported in state standby* >>>>> at >>>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87) >>>>> at >>>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774) >>>>> at >>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313) >>>>> at >>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856) >>>>> at >>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006) >>>>> at >>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843) >>>>> at >>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) >>>>> at >>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616) >>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) >>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) >>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>> at >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) >>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045) >>>>> >>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476) >>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1407) >>>>> at >>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) >>>>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source) >>>>> at >>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) >>>>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) >>>>> at >>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >>>>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source) >>>>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116) >>>>> at >>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) >>>>> at >>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) >>>>> at >>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) >>>>> at >>>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317) >>>>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258) >>>>> at >>>>> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490) >>>>> at >>>>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613) >>>>> at >>>>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415) >>>>> at java.lang.Iterable.forEach(Iterable.java:75) >>>>> at >>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>> at >>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> >>>>> *Here is error from task manager:* >>>>> *2021-07-26 18:01:15,313 ERROR >>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - >>>>> Encountered error while consuming partitions* >>>>> java.io.IOException: Connection reset by peer >>>>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method) >>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) >>>>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) >>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192) >>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) >>>>> at >>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> *2021-07-26 18:01:15,337 WARN >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask - Error >>>>> while canceling task.* >>>>> java.lang.Exception: >>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: >>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818) >>>>> at >>>>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528) >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434) >>>>> ... 1 more >>>>> >>>>> Any idea what could be the root cause and how to fix it? >>>>> >>>>> Thanks >>>>> Best regards >>>>> Rainie >>>>> >>>>