Hello community,

I'm currently struggling with an Apache Beam batch pipeline on top of
Flink. The pipeline runs smoothly in smaller environments, but in
production it always ends up with `connection timeout` in one of the last
shuffle phases.

org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
Connection for partition
260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not
reachable.
        at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
        at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
        at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
        at 
org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)
...
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection timed out: ##########/10.249.28.39:25709
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at ...

Basically the pipeline looks as follows:

read "skewed" sources -> reshuffle -> flatMap (performs a heavy computation
- few secs per element) -> write to multiple outputs (~4)

- cluster size: 100 tms
- slots per tm: 4
- data size per single job run ranging from 100G to 1TB
- job paralelism: 400

I've tried to increase netty
`taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also
increasing # of netty threads did not help. JVM performs ok (no ooms, gc
pauses, ...). Connect backlog defaults to 128.

This is probably caused by netty threads being blocked on the server side.
All these threads share the same lock, so increasing number of threads
won't help.

"Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0
tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000]
   java.lang.Thread.State: RUNNABLE
        at java.lang.Number.<init>(Number.java:55)
        at java.lang.Long.<init>(Long.java:947)
        at
sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36)
        at java.lang.reflect.Field.get(Field.java:393)
        at
org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420)
        at
org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434)
        at
org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81)
        at
org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66)
        at
org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130)
        at
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87)
        at
org.apache.flink.runtime.io.network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240)
        at
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71)
        at
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201)
        - locked <0x00000006d822e180> (a java.lang.Object)
        at
org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279)
        at
org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72)
        - locked <0x00000006cad32578> (a java.util.HashMap)
        at
org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86)
        - locked <0x000000079767ff38> (a java.lang.Object)
        at
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102)
        at
org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
        at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)

This may be related to mmap backed BoundedData implementation, where
`nextBuffer` seems to be somehow expensive (reflection, skipping empty
buffers?) . Just to note, last phases only shuffle metadata (kilobyte
scale), but the job paralelism remains the same due to beam nature (400).

Does this sound familiar to anyone? Do you have any suggestions how to
solve this?

Thanks for help,
David

Reply via email to