[ https://issues.apache.org/jira/browse/FLINK-18427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143951#comment-17143951 ]
Stephan Ewen commented on FLINK-18427: -------------------------------------- Thanks for reporting this. I think the reason is that under Java 11, Netty will allocate memory from the pool of Java Direct Memory and is affected by the MaxDirectMemory limit, Under Java 8, it allocates native memory and is not affected by that setting. For Flink 1.10.0 you probably need an increased values for "Framework Offheap Memory" in higher parallelism cases. This documentation page has more details: https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#configure-off-heap-memory-direct-or-native Flink 1.11.0 should reduce the Netty memory usage significantly, by directly reading into Flink's managed memory on the receiver side. [~zjwang], [~pnowojski] would know more details about this. > Job failed under java 11 > ------------------------ > > Key: FLINK-18427 > URL: https://issues.apache.org/jira/browse/FLINK-18427 > Project: Flink > Issue Type: Bug > Components: API / Core, API / DataStream > Reporter: Zhang Hao > Priority: Critical > > flink version:1.10.0 > deployment mode:cluster > os:linux redhat7.5 > Job parallelism:greater than 1 > My job run normally under java 8, but failed under java 11.Excpetion info > like below,netty send message failed.In addition, I found job would failed > when task was distributed on multi node, if I set job's parallelism = 1, job > run normally under java 11 too. > > 2020-06-24 09:52:162020-06-24 > 09:52:16org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > Sending the partition request to '/170.0.50.19:33320' failed. at > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:124) > at > org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient$1.operationComplete(NettyPartitionRequestClient.java:115) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:531) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:818) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:718) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:708) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.access$1700(AbstractChannelHandlerContext.java:56) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1102) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1149) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1073) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > java.io.IOException: Error while serializing message: > PartitionRequest(8059a0b47f7ba0ff814ea52427c584e7@6750c1170c861176ad3ceefe9b02f36e:0:2) > at > org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:177) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716) > ... 11 moreCaused by: java.io.IOException: java.lang.OutOfMemoryError: > Direct buffer memory at > org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:497) > at > org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageEncoder.write(NettyMessage.java:174) > ... 12 moreCaused by: java.lang.OutOfMemoryError: Direct buffer memory at > java.base/java.nio.Bits.reserveMemory(Bits.java:175) at > java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) at > java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) at > org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:772) > at > org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:748) > at > org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245) > at > org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215) > at > org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147) > at > org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) > at > org.apache.flink.runtime.io.network.netty.NettyMessage.allocateBuffer(NettyMessage.java:148) > at > org.apache.flink.runtime.io.network.netty.NettyMessage.allocateBuffer(NettyMessage.java:111) > at > org.apache.flink.runtime.io.network.netty.NettyMessage.access$200(NettyMessage.java:59) > at > org.apache.flink.runtime.io.network.netty.NettyMessage$PartitionRequest.write(NettyMessage.java:482) > ... 13 more -- This message was sent by Atlassian Jira (v8.3.4#803005)