Hi David,

I'm unfortunately not familiar with these parts of Flink but I'm pulling
Piotr in who might be able to tell you more.

Cheers,
Till

On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org> wrote:

> Hello community,
>
> I'm currently struggling with an Apache Beam batch pipeline on top of
> Flink. The pipeline runs smoothly in smaller environments, but in
> production it always ends up with `connection timeout` in one of the last
> shuffle phases.
>
> org.apache.flink.runtime.io
> .network.partition.consumer.PartitionConnectionException:
> Connection for partition
> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not
> reachable.
>         at org.apache.flink.runtime.io
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>         at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>         at org.apache.flink.runtime.io
> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>         at
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>         at
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>         at java.lang.Thread.run(Thread.java:748)
> ...
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection timed out: ##########/10.249.28.39:25709
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>         at ...
>
> Basically the pipeline looks as follows:
>
> read "skewed" sources -> reshuffle -> flatMap (performs a heavy computation
> - few secs per element) -> write to multiple outputs (~4)
>
> - cluster size: 100 tms
> - slots per tm: 4
> - data size per single job run ranging from 100G to 1TB
> - job paralelism: 400
>
> I've tried to increase netty
> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also
> increasing # of netty threads did not help. JVM performs ok (no ooms, gc
> pauses, ...). Connect backlog defaults to 128.
>
> This is probably caused by netty threads being blocked on the server side.
> All these threads share the same lock, so increasing number of threads
> won't help.
>
> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0
> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000]
>    java.lang.Thread.State: RUNNABLE
>         at java.lang.Number.<init>(Number.java:55)
>         at java.lang.Long.<init>(Long.java:947)
>         at
>
> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36)
>         at java.lang.reflect.Field.get(Field.java:393)
>         at
>
> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420)
>         at
>
> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434)
>         at
>
> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81)
>         at
>
> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66)
>         at
>
> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130)
>         at
> org.apache.flink.runtime.io
> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87)
>         at
> org.apache.flink.runtime.io
> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240)
>         at
> org.apache.flink.runtime.io
> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71)
>         at
> org.apache.flink.runtime.io
> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201)
>         - locked <0x00000006d822e180> (a java.lang.Object)
>         at
> org.apache.flink.runtime.io
> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279)
>         at
> org.apache.flink.runtime.io
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72)
>         - locked <0x00000006cad32578> (a java.util.HashMap)
>         at
> org.apache.flink.runtime.io
> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86)
>         - locked <0x000000079767ff38> (a java.lang.Object)
>         at
> org.apache.flink.runtime.io
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102)
>         at
> org.apache.flink.runtime.io
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
>         at
>
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>         at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>         at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>         at
>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>
> This may be related to mmap backed BoundedData implementation, where
> `nextBuffer` seems to be somehow expensive (reflection, skipping empty
> buffers?) . Just to note, last phases only shuffle metadata (kilobyte
> scale), but the job paralelism remains the same due to beam nature (400).
>
> Does this sound familiar to anyone? Do you have any suggestions how to
> solve this?
>
> Thanks for help,
> David
>

Reply via email to