Hi David, I'm unfortunately not familiar with these parts of Flink but I'm pulling Piotr in who might be able to tell you more.
Cheers, Till On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org> wrote: > Hello community, > > I'm currently struggling with an Apache Beam batch pipeline on top of > Flink. The pipeline runs smoothly in smaller environments, but in > production it always ends up with `connection timeout` in one of the last > shuffle phases. > > org.apache.flink.runtime.io > .network.partition.consumer.PartitionConnectionException: > Connection for partition > 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not > reachable. > at org.apache.flink.runtime.io > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > at org.apache.flink.runtime.io > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > at org.apache.flink.runtime.io > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > at > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > at > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > ... > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > Connection timed out: ##########/10.249.28.39:25709 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at ... > > Basically the pipeline looks as follows: > > read "skewed" sources -> reshuffle -> flatMap (performs a heavy computation > - few secs per element) -> write to multiple outputs (~4) > > - cluster size: 100 tms > - slots per tm: 4 > - data size per single job run ranging from 100G to 1TB > - job paralelism: 400 > > I've tried to increase netty > `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also > increasing # of netty threads did not help. JVM performs ok (no ooms, gc > pauses, ...). Connect backlog defaults to 128. > > This is probably caused by netty threads being blocked on the server side. > All these threads share the same lock, so increasing number of threads > won't help. > > "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > java.lang.Thread.State: RUNNABLE > at java.lang.Number.<init>(Number.java:55) > at java.lang.Long.<init>(Long.java:947) > at > > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > at java.lang.reflect.Field.get(Field.java:393) > at > > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > at > > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > at > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > at > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > at > > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > at > org.apache.flink.runtime.io > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > at > org.apache.flink.runtime.io > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > at > org.apache.flink.runtime.io > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > at > org.apache.flink.runtime.io > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > - locked <0x00000006d822e180> (a java.lang.Object) > at > org.apache.flink.runtime.io > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > at > org.apache.flink.runtime.io > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > - locked <0x00000006cad32578> (a java.util.HashMap) > at > org.apache.flink.runtime.io > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > - locked <0x000000079767ff38> (a java.lang.Object) > at > org.apache.flink.runtime.io > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > at > org.apache.flink.runtime.io > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > at > > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > at > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > at > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > > This may be related to mmap backed BoundedData implementation, where > `nextBuffer` seems to be somehow expensive (reflection, skipping empty > buffers?) . Just to note, last phases only shuffle metadata (kilobyte > scale), but the job paralelism remains the same due to beam nature (400). > > Does this sound familiar to anyone? Do you have any suggestions how to > solve this? > > Thanks for help, > David >