[ https://issues.apache.org/jira/browse/FLINK-33178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17771788#comment-17771788 ]
Emre Kartoglu commented on FLINK-33178: --------------------------------------- [~martijnvisser] Thanks for the comment. You're right to point that out. Just double-checked this, I can see the same code path to the same Netty call in the master branch: [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java#L320C21-L320C28] Unless Netty has done something to improve it, I believe the bottleneck might still be there. Looking at the code [https://github.com/netty/netty/blob/d773f37e3422b8bc38429bbde94583173c3b7e4a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java#L809] I can see the same call to SSLEngine. So I believe the same bottleneck might be there in the current master head. It also depends on the JDK being used, so users could work around the issue by using an upgraded version etc. I am happy to report the issue. But please feel free to close the ticket if you think the issue is best addressed elsewhere, or if you think we should observe it in practice in newer Flink versions. Judging by the codepath, I believe we would see the issue in newer versions too. > Highly parallel apps suffer from bottleneck in NativePRNG > ---------------------------------------------------------- > > Key: FLINK-33178 > URL: https://issues.apache.org/jira/browse/FLINK-33178 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network > Affects Versions: 1.13.2 > Reporter: Emre Kartoglu > Priority: Major > > I observed the below thread dumps that highlighted a potential bottleneck in > Flink/Netty/JDK. The application (Flink 1.13) from which I took the thread > dumps had very high parallelism and was distributed on nodes with >150GB > random access memory. > It appears that there is a call to "Arrays.copyOfRange" in a syncrhonized > block in "sun.security.provider.NativePRNG", which blocks other threads > waiting for the lock to the same synchronized block. This appears to be a > problem only with highly parallel applications. I don't know exactly at what > parallelism it starts becoming a problem, and how much of a bottleneck it > actually is. > I was also slightly hesitant about creating a Flink ticket as the improvement > could well be made in Netty or even JDK. But I believe we should have a > record of the issue in Flink Jira. > Related: [https://bugs.openjdk.org/browse/JDK-8278371] > > > {code:java} > "Flink Netty Server (6121) Thread 43" #930 daemon prio=5 os_prio=0 > cpu=2298176.43ms elapsed=44352.31s allocated=155G defined_classes=0 > tid=0x00007f0a3397f800 nid=0x519 waiting for monitor entry > [0x00007efc5d549000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > sun.security.provider.NativePRNG$RandomIO.implNextBytes(java.base@11.0.18/NativePRNG.java:544) > - waiting to lock <0x00007f0b62c2eee8> (a java.lang.Object) > at > sun.security.provider.NativePRNG.engineNextBytes(java.base@11.0.18/NativePRNG.java:220) > at > java.security.SecureRandom.nextBytes(java.base@11.0.18/SecureRandom.java:751) > at > sun.security.ssl.SSLCipher$T11BlockWriteCipherGenerator$BlockWriteCipher.encrypt(java.base@11.0.18/SSLCipher.java:1498) > at > sun.security.ssl.OutputRecord.t10Encrypt(java.base@11.0.18/OutputRecord.java:441) > at > sun.security.ssl.OutputRecord.encrypt(java.base@11.0.18/OutputRecord.java:345) > at > sun.security.ssl.SSLEngineOutputRecord.encode(java.base@11.0.18/SSLEngineOutputRecord.java:287) > at > sun.security.ssl.SSLEngineOutputRecord.encode(java.base@11.0.18/SSLEngineOutputRecord.java:189) > at > sun.security.ssl.SSLEngineImpl.encode(java.base@11.0.18/SSLEngineImpl.java:285) > at > sun.security.ssl.SSLEngineImpl.writeRecord(java.base@11.0.18/SSLEngineImpl.java:231) > at > sun.security.ssl.SSLEngineImpl.wrap(java.base@11.0.18/SSLEngineImpl.java:136) > - eliminated <0x00007f0b6aab70c8> (a sun.security.ssl.SSLEngineImpl) > at > sun.security.ssl.SSLEngineImpl.wrap(java.base@11.0.18/SSLEngineImpl.java:116) > - locked <0x00007f0b6aab70c8> (a sun.security.ssl.SSLEngineImpl) > at javax.net.ssl.SSLEngine.wrap(java.base@11.0.18/SSLEngine.java:522) > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:1071) > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:843) > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:811) > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.flush(SslHandler.java:792) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:246) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:110) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:173) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:89) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue$$Lambda$1875/0x00007efc8a13b4b0.run(Unknown > Source) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.lang.Thread.run(java.base@11.0.18/Thread.java:829)"Flink > Netty Server (6121) Thread 42" #929 daemon prio=5 os_prio=0 cpu=2371118.12ms > elapsed=44352.38s allocated=162G defined_classes=0 tid=0x00007f0a3397e800 > nid=0x518 runnable [0x00007f0a1997c000] > java.lang.Thread.State: RUNNABLE > at java.util.Arrays.copyOfRange(java.base@11.0.18/Arrays.java:4030) > at > sun.security.provider.NativePRNG$RandomIO.implNextBytes(java.base@11.0.18/NativePRNG.java:554) > - locked <0x00007f0b62c2eee8> (a java.lang.Object) > at > sun.security.provider.NativePRNG.engineNextBytes(java.base@11.0.18/NativePRNG.java:220) > at > java.security.SecureRandom.nextBytes(java.base@11.0.18/SecureRandom.java:751) > at > sun.security.ssl.SSLCipher$T11BlockWriteCipherGenerator$BlockWriteCipher.encrypt(java.base@11.0.18/SSLCipher.java:1498) > at > sun.security.ssl.OutputRecord.t10Encrypt(java.base@11.0.18/OutputRecord.java:441) > at > sun.security.ssl.OutputRecord.encrypt(java.base@11.0.18/OutputRecord.java:345) > at > sun.security.ssl.SSLEngineOutputRecord.encode(java.base@11.0.18/SSLEngineOutputRecord.java:287) > at > sun.security.ssl.SSLEngineOutputRecord.encode(java.base@11.0.18/SSLEngineOutputRecord.java:189) > at > sun.security.ssl.SSLEngineImpl.encode(java.base@11.0.18/SSLEngineImpl.java:285) > at > sun.security.ssl.SSLEngineImpl.writeRecord(java.base@11.0.18/SSLEngineImpl.java:231) > at > sun.security.ssl.SSLEngineImpl.wrap(java.base@11.0.18/SSLEngineImpl.java:136) > - eliminated <0x00007f183702cb78> (a sun.security.ssl.SSLEngineImpl) > at > sun.security.ssl.SSLEngineImpl.wrap(java.base@11.0.18/SSLEngineImpl.java:116) > - locked <0x00007f183702cb78> (a sun.security.ssl.SSLEngineImpl) > at javax.net.ssl.SSLEngine.wrap(java.base@11.0.18/SSLEngine.java:522) > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:1071) > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:843) > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:811) > at > org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.flush(SslHandler.java:792) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:294) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:246) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:110) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:173) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1428) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:913) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:89) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestQueue$$Lambda$1875/0x00007efc8a13b4b0.run(Unknown > Source) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)