Hi!

Concerning JAR files: I think this has nothing to do with it, it is a batch
shuffle after all. The previous stage must have completed already.

A few things that come to my mind:
  - What Flink version are you using? 1.9?
  - Are you sure that the source TaskManager is still running? Earlier
Flink versions had an issue with releasing TMs too early, sometimes before
the result was fetched by a consumer.
  - The buffer creation on the sender / netty server side is more expensive
than necessary, but should be nowhere near as expensive to cause a stall.

Can you elaborate on what the shared lock is that all server threads are
using?

Best,
Stephan


On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> > In case of large jar, wouldn't this happen in previous stages as well (if
> > so this should not be the case)?
>
> I’m not exactly sure how jars are distributed, but if they are being
> sent/uploaded from one (or some other static/fixed number, like uploading
> to and reading from a DFS) node to all, this might not scale well. Also
> your dev deployment might not be stressing network/storage/something as
> much as production deployment, which can also affect time to deploy the job.
>
> What’s yours job size? (How large is the jar uploaded to Flink?)
>
> Also there might be other factors in play here, like if you are using
> Flink job mode (not stand alone), time to start up a Flink node might be
> too long. Some nodes are already up and running and they are time outing
> waiting for others to start up.
>
> > Also there shouldn't be any state involved
> > (unless Beam IO's use it internally).
>
> My bad. Instead of
>
> > - data size per single job run ranging from 100G to 1TB
>
> I read state size 100G to 1TB.
>
> Piotrek
>
> >
> > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <pi...@ververica.com>
> wrote:
> >
> >> Hi David,
> >>
> >> The usual cause for connection time out is long deployment. For example
> if
> >> your Job's jar is large and takes long time to distribute across the
> >> cluster. I’m not sure if large state could affect this as well or not.
> Are
> >> you sure that’s not the case?
> >>
> >> The think you are suggesting, I haven’t heard about previously, but
> indeed
> >> theoretically it could happen. Reading from mmap’ed sub partitions could
> >> block the Netty threads if kernel decides to drop mmap’ed page and it
> has
> >> to be read from the disks. Could you check your CPU and disks IO usage?
> >> This should be visible by high IOWait CPU usage. Could you for example
> post
> >> couple of sample results of
> >>
> >>        iostat -xm 2
> >>
> >> command from some representative Task Manager? If indeed disks are
> >> overloaded, changing Flink’s config option
> >>
> >>        taskmanager.network.bounded-blocking-subpartition-type
> >>
> >> From default to `file` could solve the problem. FYI, this option is
> >> renamed in 1.10 to
> >>
> >>        taskmanager.network.blocking-shuffle.type
> >>
> >> And it’s default value will be `file`.
> >>
> >> We would appreciate if you could get back to us with the results!
> >>
> >> Piotrek
> >>
> >>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org> wrote:
> >>>
> >>> Hi David,
> >>>
> >>> I'm unfortunately not familiar with these parts of Flink but I'm
> pulling
> >>> Piotr in who might be able to tell you more.
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org> wrote:
> >>>
> >>>> Hello community,
> >>>>
> >>>> I'm currently struggling with an Apache Beam batch pipeline on top of
> >>>> Flink. The pipeline runs smoothly in smaller environments, but in
> >>>> production it always ends up with `connection timeout` in one of the
> >> last
> >>>> shuffle phases.
> >>>>
> >>>> org.apache.flink.runtime.io
> >>>> .network.partition.consumer.PartitionConnectionException:
> >>>> Connection for partition
> >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not
> >>>> reachable.
> >>>>       at org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
> >>>>       at org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
> >>>>       at org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
> >>>>       at
> >>>>
> >>
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
> >>>>       at
> >>>>
> >>
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
> >>>>       at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
> >>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> >>>>       at java.lang.Thread.run(Thread.java:748)
> >>>> ...
> >>>> Caused by:
> >>>>
> >>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> >>>> Connection timed out: ##########/10.249.28.39:25709
> >>>>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> >>>>       at
> >>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> >>>>       at ...
> >>>>
> >>>> Basically the pipeline looks as follows:
> >>>>
> >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy
> >> computation
> >>>> - few secs per element) -> write to multiple outputs (~4)
> >>>>
> >>>> - cluster size: 100 tms
> >>>> - slots per tm: 4
> >>>> - data size per single job run ranging from 100G to 1TB
> >>>> - job paralelism: 400
> >>>>
> >>>> I've tried to increase netty
> >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck.
> Also
> >>>> increasing # of netty threads did not help. JVM performs ok (no ooms,
> gc
> >>>> pauses, ...). Connect backlog defaults to 128.
> >>>>
> >>>> This is probably caused by netty threads being blocked on the server
> >> side.
> >>>> All these threads share the same lock, so increasing number of threads
> >>>> won't help.
> >>>>
> >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0
> >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000]
> >>>>  java.lang.Thread.State: RUNNABLE
> >>>>       at java.lang.Number.<init>(Number.java:55)
> >>>>       at java.lang.Long.<init>(Long.java:947)
> >>>>       at
> >>>>
> >>>>
> >>
> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36)
> >>>>       at java.lang.reflect.Field.get(Field.java:393)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201)
> >>>>       - locked <0x00000006d822e180> (a java.lang.Object)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72)
> >>>>       - locked <0x00000006cad32578> (a java.util.HashMap)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86)
> >>>>       - locked <0x000000079767ff38> (a java.lang.Object)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102)
> >>>>       at
> >>>> org.apache.flink.runtime.io
> >>>>
> >>
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> >>>>       at
> >>>>
> >>>>
> >>
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> >>>>
> >>>> This may be related to mmap backed BoundedData implementation, where
> >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping empty
> >>>> buffers?) . Just to note, last phases only shuffle metadata (kilobyte
> >>>> scale), but the job paralelism remains the same due to beam nature
> >> (400).
> >>>>
> >>>> Does this sound familiar to anyone? Do you have any suggestions how to
> >>>> solve this?
> >>>>
> >>>> Thanks for help,
> >>>> David
> >>>>
> >>
> >>
>
>

Reply via email to