Hi Nicolaus,

I double checked again our hdfs config, it is setting 1 instead of 2.
I will try the solution you provided.

Thanks again.
Best regards
Rainie

On Wed, Aug 4, 2021 at 10:40 AM Rainie Li <raini...@pinterest.com> wrote:

> Thanks for the context Nicolaus.
> We are using S3 instead of HDFS.
>
> Best regards
> Rainie
>
> On Wed, Aug 4, 2021 at 12:39 AM Nicolaus Weidner <
> nicolaus.weid...@ververica.com> wrote:
>
>> Hi Rainie,
>>
>> I found a similar issue on stackoverflow, though quite different
>> stacktrace:
>> https://stackoverflow.com/questions/64400280/flink-unable-to-recover-after-yarn-node-termination
>> Do you replicate data on multiple hdfs nodes like suggested in the answer
>> there?
>>
>> Best,
>> Nico
>>
>> On Wed, Aug 4, 2021 at 9:24 AM Rainie Li <raini...@pinterest.com> wrote:
>>
>>> Thanks Till.
>>> We terminated one of the worker nodes.
>>> We enabled HA by using Zookeeper.
>>> Sure, we will try upgrade job to newer version.
>>>
>>> Best regards
>>> Rainie
>>>
>>> On Tue, Aug 3, 2021 at 11:57 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Rainie,
>>>>
>>>> It looks to me as if Yarn is causing this problem. Which Yarn node are
>>>> you terminating? Have you configured your Yarn cluster to be highly
>>>> available in case you are terminating the ResourceManager?
>>>>
>>>> Flink should retry the operation of starting a new container in case it
>>>> fails. If this is not the case, then please upgrade to one of the actively
>>>> maintained Flink versions (1.12 or 1.13) and try whether it works with this
>>>> version.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Aug 3, 2021 at 9:56 AM Rainie Li <raini...@pinterest.com>
>>>> wrote:
>>>>
>>>>> Hi Flink Community,
>>>>>
>>>>> My flink application is running version 1.9 and it failed to recover
>>>>> (application was running but checkpoint failed and job stopped to process
>>>>> data) during hadoop yarn node termination.
>>>>>
>>>>> *Here is job manager log error:*
>>>>> *2021-07-26 18:02:58,605 INFO
>>>>>  org.apache.hadoop.io.retry.RetryInvocationHandler             - Exception
>>>>> while invoking getFileInfo of class ClientNamenodeProtocolTranslatorPB 
>>>>> over
>>>>> xenon-pii-prod-004-20210305-namenode-0/10.1.185.175:8020
>>>>> <http://10.1.185.175:8020>. Trying to fail over immediately.*
>>>>> *org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException):
>>>>> Operation category READ is not supported in state standby*
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.ha.StandbyState.checkOperation(StandbyState.java:87)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNode$NameNodeHAContext.checkOperation(NameNode.java:1774)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkOperation(FSNamesystem.java:1313)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3856)
>>>>> at
>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1006)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>> at
>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>>>>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)
>>>>>
>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
>>>>> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
>>>>> at
>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>> at com.sun.proxy.$Proxy51.getFileInfo(Unknown Source)
>>>>> at
>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>>>>> at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>>>>> at
>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>> at com.sun.proxy.$Proxy52.getFileInfo(Unknown Source)
>>>>> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2116)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
>>>>> at
>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>> at
>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
>>>>> at org.apache.flink.yarn.Utils.registerLocalResource(Utils.java:258)
>>>>> at
>>>>> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:490)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:613)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.startTaskExecutorInContainer(YarnResourceManager.java:415)
>>>>> at java.lang.Iterable.forEach(Iterable.java:75)
>>>>> at
>>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:397)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>> at
>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>> *Here is error from task manager:*
>>>>> *2021-07-26 18:01:15,313 ERROR
>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
>>>>> Encountered error while consuming partitions*
>>>>> java.io.IOException: Connection reset by peer
>>>>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> *2021-07-26 18:01:15,337 WARN
>>>>>  org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>>>>> while canceling task.*
>>>>> java.lang.Exception:
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:232)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by:
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:182)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.cancel(KafkaFetcher.java:175)
>>>>> at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:818)
>>>>> at
>>>>> com.pinterest.xenon.unified.XenonUnifiedSource.cancel(XenonUnifiedSource.java:453)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:134)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:158)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:528)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1434)
>>>>> ... 1 more
>>>>>
>>>>> Any idea what could be the root cause and how to fix it?
>>>>>
>>>>> Thanks
>>>>> Best regards
>>>>> Rainie
>>>>>
>>>>

Reply via email to