/CC Piotr and Zhijiang

Sounds reasonable at first glance. Would like to hear Piotr's and
Zhijiang's take, though, they know that code better than me.

On Wed, Jan 29, 2020 at 1:58 PM David Morávek <david.mora...@gmail.com>
wrote:

> Hi Stephan,
>
> I've actually managed to narrow problem down to blocked netty server
> threads. I'm using 1.9.1 with few custom patches
> <https://github.com/dmvk/flink/commits/1.9.1-szn>, that are not relevant
> to
> this issue.
>
> To highlight the problem, I've added these checks to ResultPartitionManager
> <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>, which
> measure how long "netty tasks" take to execute (monitor acquisition +
> actual execution).
>
> We indeed have a pretty busy cluster, with high load and io waits (mostly
> due to ongoing shuffles and computations). From the measurements I can see
> numbers like:
>
> createSubpartitionView: 129255ms
> createSubpartitionView: 129333ms
> createSubpartitionView: 129353ms
> createSubpartitionView: 129354ms
> createSubpartitionView: 144419ms
> createSubpartitionView: 144653ms
> createSubpartitionView: 144759ms
> createSubpartitionView: 144905ms
> releasePartition: 145218ms
> releasePartition: 145250ms
> releasePartition: 145256ms
> releasePartition: 145263ms
>
> These vary a lot, depending on what other pipelines are being
> simultaneously executed. These numbers imply that at least for 145 seconds
> (which is greater than conn. timeout), taskmanger was not able to accept
> any connection (because of netty internals
> <https://github.com/netty/netty/issues/240>).
>
> Switching to *file* backed BoundedData implementation didn't help, because
> there are still heavy IO ops being executed by netty threads when monitor
> is acquired (eg. deletion of backing file).
>
> 1) I've tried to make more fine-grained locking (locking as single
> partition insead of partitionManager). This helped a little, but pipeline
> is still unable to finish in some cases.
>
> 2) Currently I'm trying to completely remove locking from
> ResultPartitionManager, as it is seems only relevant for internal
> datastructure access (can be replaced with java.concurrent). I think this
> should also have an impact to overall job completition times in some cases.
>
> What do you think?
>
> On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <se...@apache.org> wrote:
>
> > Hi!
> >
> > Concerning JAR files: I think this has nothing to do with it, it is a
> batch
> > shuffle after all. The previous stage must have completed already.
> >
> > A few things that come to my mind:
> >   - What Flink version are you using? 1.9?
> >   - Are you sure that the source TaskManager is still running? Earlier
> > Flink versions had an issue with releasing TMs too early, sometimes
> before
> > the result was fetched by a consumer.
> >   - The buffer creation on the sender / netty server side is more
> expensive
> > than necessary, but should be nowhere near as expensive to cause a stall.
> >
> > Can you elaborate on what the shared lock is that all server threads are
> > using?
> >
> > Best,
> > Stephan
> >
> >
> > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com>
> > wrote:
> >
> > > Hi,
> > >
> > > > In case of large jar, wouldn't this happen in previous stages as well
> > (if
> > > > so this should not be the case)?
> > >
> > > I’m not exactly sure how jars are distributed, but if they are being
> > > sent/uploaded from one (or some other static/fixed number, like
> uploading
> > > to and reading from a DFS) node to all, this might not scale well. Also
> > > your dev deployment might not be stressing network/storage/something as
> > > much as production deployment, which can also affect time to deploy the
> > job.
> > >
> > > What’s yours job size? (How large is the jar uploaded to Flink?)
> > >
> > > Also there might be other factors in play here, like if you are using
> > > Flink job mode (not stand alone), time to start up a Flink node might
> be
> > > too long. Some nodes are already up and running and they are time
> outing
> > > waiting for others to start up.
> > >
> > > > Also there shouldn't be any state involved
> > > > (unless Beam IO's use it internally).
> > >
> > > My bad. Instead of
> > >
> > > > - data size per single job run ranging from 100G to 1TB
> > >
> > > I read state size 100G to 1TB.
> > >
> > > Piotrek
> > >
> > > >
> > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <pi...@ververica.com
> >
> > > wrote:
> > > >
> > > >> Hi David,
> > > >>
> > > >> The usual cause for connection time out is long deployment. For
> > example
> > > if
> > > >> your Job's jar is large and takes long time to distribute across the
> > > >> cluster. I’m not sure if large state could affect this as well or
> not.
> > > Are
> > > >> you sure that’s not the case?
> > > >>
> > > >> The think you are suggesting, I haven’t heard about previously, but
> > > indeed
> > > >> theoretically it could happen. Reading from mmap’ed sub partitions
> > could
> > > >> block the Netty threads if kernel decides to drop mmap’ed page and
> it
> > > has
> > > >> to be read from the disks. Could you check your CPU and disks IO
> > usage?
> > > >> This should be visible by high IOWait CPU usage. Could you for
> example
> > > post
> > > >> couple of sample results of
> > > >>
> > > >>        iostat -xm 2
> > > >>
> > > >> command from some representative Task Manager? If indeed disks are
> > > >> overloaded, changing Flink’s config option
> > > >>
> > > >>        taskmanager.network.bounded-blocking-subpartition-type
> > > >>
> > > >> From default to `file` could solve the problem. FYI, this option is
> > > >> renamed in 1.10 to
> > > >>
> > > >>        taskmanager.network.blocking-shuffle.type
> > > >>
> > > >> And it’s default value will be `file`.
> > > >>
> > > >> We would appreciate if you could get back to us with the results!
> > > >>
> > > >> Piotrek
> > > >>
> > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org>
> > wrote:
> > > >>>
> > > >>> Hi David,
> > > >>>
> > > >>> I'm unfortunately not familiar with these parts of Flink but I'm
> > > pulling
> > > >>> Piotr in who might be able to tell you more.
> > > >>>
> > > >>> Cheers,
> > > >>> Till
> > > >>>
> > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org>
> > wrote:
> > > >>>
> > > >>>> Hello community,
> > > >>>>
> > > >>>> I'm currently struggling with an Apache Beam batch pipeline on top
> > of
> > > >>>> Flink. The pipeline runs smoothly in smaller environments, but in
> > > >>>> production it always ends up with `connection timeout` in one of
> the
> > > >> last
> > > >>>> shuffle phases.
> > > >>>>
> > > >>>> org.apache.flink.runtime.io
> > > >>>> .network.partition.consumer.PartitionConnectionException:
> > > >>>> Connection for partition
> > > >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f
> > not
> > > >>>> reachable.
> > > >>>>       at org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
> > > >>>>       at org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
> > > >>>>       at org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
> > > >>>>       at
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
> > > >>>>       at
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
> > > >>>>       at
> > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
> > > >>>>       at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > >>>>       at java.lang.Thread.run(Thread.java:748)
> > > >>>> ...
> > > >>>> Caused by:
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> > > >>>> Connection timed out: ##########/10.249.28.39:25709
> > > >>>>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > > >>>>       at
> > > >>>>
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> > > >>>>       at ...
> > > >>>>
> > > >>>> Basically the pipeline looks as follows:
> > > >>>>
> > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy
> > > >> computation
> > > >>>> - few secs per element) -> write to multiple outputs (~4)
> > > >>>>
> > > >>>> - cluster size: 100 tms
> > > >>>> - slots per tm: 4
> > > >>>> - data size per single job run ranging from 100G to 1TB
> > > >>>> - job paralelism: 400
> > > >>>>
> > > >>>> I've tried to increase netty
> > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck.
> > > Also
> > > >>>> increasing # of netty threads did not help. JVM performs ok (no
> > ooms,
> > > gc
> > > >>>> pauses, ...). Connect backlog defaults to 128.
> > > >>>>
> > > >>>> This is probably caused by netty threads being blocked on the
> server
> > > >> side.
> > > >>>> All these threads share the same lock, so increasing number of
> > threads
> > > >>>> won't help.
> > > >>>>
> > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0
> > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000]
> > > >>>>  java.lang.Thread.State: RUNNABLE
> > > >>>>       at java.lang.Number.<init>(Number.java:55)
> > > >>>>       at java.lang.Long.<init>(Long.java:947)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36)
> > > >>>>       at java.lang.reflect.Field.get(Field.java:393)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201)
> > > >>>>       - locked <0x00000006d822e180> (a java.lang.Object)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72)
> > > >>>>       - locked <0x00000006cad32578> (a java.util.HashMap)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86)
> > > >>>>       - locked <0x000000079767ff38> (a java.lang.Object)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102)
> > > >>>>       at
> > > >>>> org.apache.flink.runtime.io
> > > >>>>
> > > >>
> > >
> >
> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
> > > >>>>       at
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
> > > >>>>
> > > >>>> This may be related to mmap backed BoundedData implementation,
> where
> > > >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping
> > empty
> > > >>>> buffers?) . Just to note, last phases only shuffle metadata
> > (kilobyte
> > > >>>> scale), but the job paralelism remains the same due to beam nature
> > > >> (400).
> > > >>>>
> > > >>>> Does this sound familiar to anyone? Do you have any suggestions
> how
> > to
> > > >>>> solve this?
> > > >>>>
> > > >>>> Thanks for help,
> > > >>>> David
> > > >>>>
> > > >>
> > > >>
> > >
> > >
> >
>

Reply via email to