Hi David,

The usual cause for connection time out is long deployment. For example if your 
Job's jar is large and takes long time to distribute across the cluster. I’m 
not sure if large state could affect this as well or not. Are you sure that’s 
not the case?

The think you are suggesting, I haven’t heard about previously, but indeed 
theoretically it could happen. Reading from mmap’ed sub partitions could block 
the Netty threads if kernel decides to drop mmap’ed page and it has to be read 
from the disks. Could you check your CPU and disks IO usage? This should be 
visible by high IOWait CPU usage. Could you for example post couple of sample 
results of 

        iostat -xm 2

command from some representative Task Manager? If indeed disks are overloaded, 
changing Flink’s config option

        taskmanager.network.bounded-blocking-subpartition-type

From default to `file` could solve the problem. FYI, this option is renamed in 
1.10 to

        taskmanager.network.blocking-shuffle.type

And it’s default value will be `file`.

We would appreciate if you could get back to us with the results!

Piotrek

> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Hi David,
> 
> I'm unfortunately not familiar with these parts of Flink but I'm pulling
> Piotr in who might be able to tell you more.
> 
> Cheers,
> Till
> 
> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org> wrote:
> 
>> Hello community,
>> 
>> I'm currently struggling with an Apache Beam batch pipeline on top of
>> Flink. The pipeline runs smoothly in smaller environments, but in
>> production it always ends up with `connection timeout` in one of the last
>> shuffle phases.
>> 
>> org.apache.flink.runtime.io
>> .network.partition.consumer.PartitionConnectionException:
>> Connection for partition
>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not
>> reachable.
>>        at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>        at org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>        at org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>>        at
>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>>        at
>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>>        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>        at java.lang.Thread.run(Thread.java:748)
>> ...
>> Caused by:
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection timed out: ##########/10.249.28.39:25709
>>        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>        at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>        at ...
>> 
>> Basically the pipeline looks as follows:
>> 
>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy computation
>> - few secs per element) -> write to multiple outputs (~4)
>> 
>> - cluster size: 100 tms
>> - slots per tm: 4
>> - data size per single job run ranging from 100G to 1TB
>> - job paralelism: 400
>> 
>> I've tried to increase netty
>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. Also
>> increasing # of netty threads did not help. JVM performs ok (no ooms, gc
>> pauses, ...). Connect backlog defaults to 128.
>> 
>> This is probably caused by netty threads being blocked on the server side.
>> All these threads share the same lock, so increasing number of threads
>> won't help.
>> 
>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0
>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000]
>>   java.lang.Thread.State: RUNNABLE
>>        at java.lang.Number.<init>(Number.java:55)
>>        at java.lang.Long.<init>(Long.java:947)
>>        at
>> 
>> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36)
>>        at java.lang.reflect.Field.get(Field.java:393)
>>        at
>> 
>> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420)
>>        at
>> 
>> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434)
>>        at
>> 
>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81)
>>        at
>> 
>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66)
>>        at
>> 
>> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130)
>>        at
>> org.apache.flink.runtime.io
>> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87)
>>        at
>> org.apache.flink.runtime.io
>> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240)
>>        at
>> org.apache.flink.runtime.io
>> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71)
>>        at
>> org.apache.flink.runtime.io
>> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201)
>>        - locked <0x00000006d822e180> (a java.lang.Object)
>>        at
>> org.apache.flink.runtime.io
>> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279)
>>        at
>> org.apache.flink.runtime.io
>> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72)
>>        - locked <0x00000006cad32578> (a java.util.HashMap)
>>        at
>> org.apache.flink.runtime.io
>> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86)
>>        - locked <0x000000079767ff38> (a java.lang.Object)
>>        at
>> org.apache.flink.runtime.io
>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102)
>>        at
>> org.apache.flink.runtime.io
>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
>>        at
>> 
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>        at
>> 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>>        at
>> 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>>        at
>> 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> 
>> This may be related to mmap backed BoundedData implementation, where
>> `nextBuffer` seems to be somehow expensive (reflection, skipping empty
>> buffers?) . Just to note, last phases only shuffle metadata (kilobyte
>> scale), but the job paralelism remains the same due to beam nature (400).
>> 
>> Does this sound familiar to anyone? Do you have any suggestions how to
>> solve this?
>> 
>> Thanks for help,
>> David
>> 

Reply via email to