Hi,

> Could it be somehow partition info isn't up to date on TM when job is 
> restarting?
Partition info should be up to date or become so eventually - but this
is assuming that JM is able to detect the failure.

The latter may not be the case, as Sihan You wrote previously:
> The strange thing is that only 23 of the TM are complaining about the 
> connection issue.
> When this exception occurs, the TM they are complaining about is still up and 
> live.

So it looks like JM to TM network links are OK, but some TM-TM links are down.
Do you have an option to check connection from TM to TM during this
restart loop?

Also it makes sense to check heartbeat interval and timeout [1] so
that JM can detect TM failure quickly enough.

https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-interval

Regards,
Roman


On Tue, May 4, 2021 at 10:17 PM Yichen Liu <arsenalliu...@gmail.com> wrote:
>
> Chime in here since I work with Sihan.
>
> Roman, there isn't much logs beyond this WARN, in fact it should be ERROR 
> since it fail our job and job has to restart.
>
> Here is a fresh new example of "Sending the partition request to 'null' 
> failed." exception. The only log we see before exception was:
>
> timestamp="2021-05-04 14:04:33,014", level="INFO", thread="Latest Billing 
> Info Operator -> (Filter, Filter) (55/80)#12", 
> class="org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend", 
> method="cleanInstanceBasePath(line:462)", message="Closed RocksDB State 
> Backend. Cleaning up RocksDB working directory 
> /tmp/flink-io-9570aace-eec0-4dd9-867f-22a7d367282e/job_00000000000000000000000000000000_op_KeyedProcessOperator_6cf741936dcd5ce8199875ace1f5638a__55_80__uuid_b8ef0675-4355-4b99-9f03-77d1eb713bf4."
>
> timestamp="2021-05-04 14:04:33,633", level="WARN", thread="Latest Billing 
> Info Operator -> (Filter, Filter) (55/80)#12", 
> class="org.apache.flink.runtime.taskmanager.Task", 
> method="transitionState(line:1033)", message="Latest Billing Info Operator -> 
> (Filter, Filter) (55/80)#12 (0f9caefb1122609e3337f2537f7324c3) switched from 
> RUNNING to FAILED." 
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> Sending the partition request to 'null' failed. at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>  ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>
> but this looks more like a consequence than cause of exception.
>
> Note this seems to be pretty consistent when one of our TMs went lost. Could 
> it be somehow partition info isn't up to date on TM when job is restarting?
>
> Also note that we have a pretty huge state, each TM has around 130GB state, 
> TMs have a setting of 10GB memory and 2700m CPU (in k8s unit).
>
> On Mon, May 3, 2021 at 8:29 AM Roman Khachatryan <ro...@apache.org> wrote:
>>
>> Hi,
>>
>> I see that JM and TM failures are different (from TM, it's actually a
>> warning). Could you please share the ERROR message from TM?
>>
>> Have you tried increasing taskmanager.network.retries [1]?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html#taskmanager-network-retries
>>
>> Regards,
>> Roman
>>
>> On Fri, Apr 30, 2021 at 11:55 PM Sihan You <leo.yo...@gmail.com> wrote:
>> >
>> > Hi,
>> >
>> > We are experiencing some netty issue with our Flink cluster, which we 
>> > couldn't figure the cause.
>> >
>> > Below is the stack trace of exceptions from TM's and JM's perspectives.  
>> > we have 85 TMs and one JM in HA mode. The strange thing is that only 23 of 
>> > the TM are complaining about the connection issue. When this exception 
>> > occurs, the TM they are complaining about is still up and live. this will 
>> > cause our job to be stuck in the restart loop for a couple of hours then 
>> > back to normal.
>> >
>> > We are using HDFS as the state backend and the checkpoint dir.
>> > the application is running in our own data center and in Kubernetes as a 
>> > standalone job.
>> >
>> >
>> > ## Job Graph
>> >
>> > the job graph is like this.
>> > source 1.1 (5 parallelism).  ->
>> >                                                   union ->
>> > source 1.2 (80 parallelism) ->
>> >                                                                     
>> > connect -> sink
>> > source 2.1 (5 parallelism).  ->
>> >                                                   union ->
>> > source 2.2 (80 parallelism) ->
>> >
>> >
>> > ## JM's Stacktrace
>> >
>> > ```
>> > message="PLI Deduplicate Operator (60/80) 
>> > (5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on 
>> > 100.98.115.117:6122-924d20 @ 100.98.115.117 
>> > (dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >  Sending the partition request to 'null' failed. at 
>> > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by: 
>> > java.nio.channels.ClosedChannelException at 
>> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more
>> > ```
>> >
>> > ## TM's stacktrace
>> > ```
>> > timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate 
>> > Operator (6/80)#6", 
>> > class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory",
>> >  method="connectWithRetries(line:121)", message="Failed 1 times to connect 
>> > to /100.98.115.117:41245. 
>> > Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> >  Connecting to remote task manager '/100.98.115.117:41245' has failed. 
>> > This might indicate that the remote task manager has been lost. at 
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> >  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
>> > [flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
>> >  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>> >  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>> >  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>> >  [flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
>> > [flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
>> > [flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > java.lang.Thread.run(Thread.java:834) [?:?]Caused by: 
>> > java.lang.NullPointerException at 
>> > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59) 
>> > ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] at 
>> > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136)
>> >  ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more
>> >
>> > ```
>
>
>
> --
> YL

Reply via email to