Hello community, I'm currently struggling with an Apache Beam batch pipeline on top of Flink. The pipeline runs smoothly in smaller environments, but in production it always ends up with `connection timeout` in one of the last shuffle phases.
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not reachable. at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) ... Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection timed out: ##########/10.249.28.39:25709 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at ... Basically the pipeline looks as follows: read "skewed" sources -> reshuffle -> flatMap (performs a heavy computation - few secs per element) -> write to multiple outputs (~4) - cluster size: 100 tms - slots per tm: 4 - data size per single job run ranging from 100G to 1TB - job paralelism: 400 I've tried to increase netty `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also increasing # of netty threads did not help. JVM performs ok (no ooms, gc pauses, ...). Connect backlog defaults to 128. This is probably caused by netty threads being blocked on the server side. All these threads share the same lock, so increasing number of threads won't help. "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] java.lang.Thread.State: RUNNABLE at java.lang.Number.<init>(Number.java:55) at java.lang.Long.<init>(Long.java:947) at sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) at java.lang.reflect.Field.get(Field.java:393) at org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) at org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) at org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) at org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) at org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) at org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) at org.apache.flink.runtime.io.network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) - locked <0x00000006d822e180> (a java.lang.Object) at org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) - locked <0x00000006cad32578> (a java.util.HashMap) at org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) - locked <0x000000079767ff38> (a java.lang.Object) at org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) at org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) This may be related to mmap backed BoundedData implementation, where `nextBuffer` seems to be somehow expensive (reflection, skipping empty buffers?) . Just to note, last phases only shuffle metadata (kilobyte scale), but the job paralelism remains the same due to beam nature (400). Does this sound familiar to anyone? Do you have any suggestions how to solve this? Thanks for help, David