Hi, > Could it be somehow partition info isn't up to date on TM when job is > restarting? Partition info should be up to date or become so eventually - but this is assuming that JM is able to detect the failure.
The latter may not be the case, as Sihan You wrote previously: > The strange thing is that only 23 of the TM are complaining about the > connection issue. > When this exception occurs, the TM they are complaining about is still up and > live. So it looks like JM to TM network links are OK, but some TM-TM links are down. Do you have an option to check connection from TM to TM during this restart loop? Also it makes sense to check heartbeat interval and timeout [1] so that JM can detect TM failure quickly enough. https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-interval Regards, Roman On Tue, May 4, 2021 at 10:17 PM Yichen Liu <arsenalliu...@gmail.com> wrote: > > Chime in here since I work with Sihan. > > Roman, there isn't much logs beyond this WARN, in fact it should be ERROR > since it fail our job and job has to restart. > > Here is a fresh new example of "Sending the partition request to 'null' > failed." exception. The only log we see before exception was: > > timestamp="2021-05-04 14:04:33,014", level="INFO", thread="Latest Billing > Info Operator -> (Filter, Filter) (55/80)#12", > class="org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend", > method="cleanInstanceBasePath(line:462)", message="Closed RocksDB State > Backend. Cleaning up RocksDB working directory > /tmp/flink-io-9570aace-eec0-4dd9-867f-22a7d367282e/job_00000000000000000000000000000000_op_KeyedProcessOperator_6cf741936dcd5ce8199875ace1f5638a__55_80__uuid_b8ef0675-4355-4b99-9f03-77d1eb713bf4." > > timestamp="2021-05-04 14:04:33,633", level="WARN", thread="Latest Billing > Info Operator -> (Filter, Filter) (55/80)#12", > class="org.apache.flink.runtime.taskmanager.Task", > method="transitionState(line:1033)", message="Latest Billing Info Operator -> > (Filter, Filter) (55/80)#12 (0f9caefb1122609e3337f2537f7324c3) switched from > RUNNING to FAILED." > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > Sending the partition request to 'null' failed. at > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) > ~[flink-dist_2.12-1.12.2.jar:1.12.2] > > but this looks more like a consequence than cause of exception. > > Note this seems to be pretty consistent when one of our TMs went lost. Could > it be somehow partition info isn't up to date on TM when job is restarting? > > Also note that we have a pretty huge state, each TM has around 130GB state, > TMs have a setting of 10GB memory and 2700m CPU (in k8s unit). > > On Mon, May 3, 2021 at 8:29 AM Roman Khachatryan <ro...@apache.org> wrote: >> >> Hi, >> >> I see that JM and TM failures are different (from TM, it's actually a >> warning). Could you please share the ERROR message from TM? >> >> Have you tried increasing taskmanager.network.retries [1]? >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries >> >> Regards, >> Roman >> >> On Fri, Apr 30, 2021 at 11:55 PM Sihan You <leo.yo...@gmail.com> wrote: >> > >> > Hi, >> > >> > We are experiencing some netty issue with our Flink cluster, which we >> > couldn't figure the cause. >> > >> > Below is the stack trace of exceptions from TM's and JM's perspectives. >> > we have 85 TMs and one JM in HA mode. The strange thing is that only 23 of >> > the TM are complaining about the connection issue. When this exception >> > occurs, the TM they are complaining about is still up and live. this will >> > cause our job to be stuck in the restart loop for a couple of hours then >> > back to normal. >> > >> > We are using HDFS as the state backend and the checkpoint dir. >> > the application is running in our own data center and in Kubernetes as a >> > standalone job. >> > >> > >> > ## Job Graph >> > >> > the job graph is like this. >> > source 1.1 (5 parallelism). -> >> > union -> >> > source 1.2 (80 parallelism) -> >> > >> > connect -> sink >> > source 2.1 (5 parallelism). -> >> > union -> >> > source 2.2 (80 parallelism) -> >> > >> > >> > ## JM's Stacktrace >> > >> > ``` >> > message="PLI Deduplicate Operator (60/80) >> > (5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on >> > 100.98.115.117:6122-924d20 @ 100.98.115.117 >> > (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >> > Sending the partition request to 'null' failed. at >> > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by: >> > java.nio.channels.ClosedChannelException at >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more >> > ``` >> > >> > ## TM's stacktrace >> > ``` >> > timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate >> > Operator (6/80)#6", >> > class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory", >> > method="connectWithRetries(line:121)", message="Failed 1 times to connect >> > to /100.98.115.117:41245. >> > Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: >> > Connecting to remote task manager '/100.98.115.117:41245' has failed. >> > This might indicate that the remote task manager has been lost. at >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) >> > [flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) >> > [flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297) >> > [flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) >> > [flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >> > [flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >> > [flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >> > [flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >> > [flink-dist_2.12-1.12.2.jar:1.12.2] at >> > java.lang.Thread.run(Thread.java:834) [?:?]Caused by: >> > java.lang.NullPointerException at >> > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at >> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136) >> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more >> > >> > ``` > > > > -- > YL