Hi,
We are experiencing some netty issue with our Flink cluster, which we
couldn't figure the cause.
Below is the stack trace of exceptions from TM's and JM's perspectives. we
have 85 TMs and one JM in HA mode. The strange thing is that only 23 of the
TM are complaining about the connection issue. When this exception occurs,
the TM they are complaining about is still up and live. this will cause our
job to be stuck in the restart loop for a couple of hours then back to
normal.
We are using HDFS as the state backend and the checkpoint dir.
the application is running in our own data center and in Kubernetes as a
standalone job.
## Job Graph
the job graph is like this.
source 1.1 (5 parallelism). ->
union ->
source 1.2 (80 parallelism) ->
connect
-> sink
source 2.1 (5 parallelism). ->
union ->
source 2.2 (80 parallelism) ->
## JM's Stacktrace
```
message="PLI Deduplicate Operator (60/80)
(5d2b9fba2eaeae452068bc53e4232d0c) switched from RUNNING to FAILED on
100.98.115.117:6122-924d20 @ 100.98.115.117
(dataPort=41245)."org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Sending the partition request to 'null' failed. at
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:137)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:125)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.NettyMessage.writeToChannel(NettyMessage.java:737)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:521)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:171)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
java.lang.Thread.run(Thread.java:834) ~[?:?]Caused by:
java.nio.channels.ClosedChannelException at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 18 more
```
## TM's stacktrace
```
timestamp="2021-04-30 20:23:25,401", level="WARN", thread="PLI Deduplicate
Operator (6/80)#6",
class="org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory",
method="connectWithRetries(line:121)", message="Failed 1 times to connect
to /100.98.115.117:41245.
Retrying."org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connecting to remote task manager '/100.98.115.117:41245' has failed. This
might indicate that the remote task manager has been lost. at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:145)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connectWithRetries(PartitionRequestClientFactory.java:114)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:81)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:70)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:179)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.internalRequestPartitions(SingleInputGate.java:321)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-dist_2.12-1.12.2.jar:1.12.2] at
java.lang.Thread.run(Thread.java:834) [?:?]Caused by:
java.lang.NullPointerException at
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:59)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.<init>(NettyPartitionRequestClient.java:74)
~[flink-dist_2.12-1.12.2.jar:1.12.2] at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.connect(PartitionRequestClientFactory.java:136)
~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 16 more
```