Just to clarify, these are bare metal nodes (128G ram, 16 cpus +
hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase.

D.

On Wed, Jan 29, 2020 at 5:03 PM David Morávek <david.mora...@gmail.com>
wrote:

> Hi Piotr,
>
> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not
> help, I've already tried that (there are still other problematic code
> paths, eg. releasePartition). I think it's perfectly ok to perform IO ops
> in netty threads, we just have to make sure, we can leverage multiple
> threads at once. Synchronization in ResultPartitionManager effectively
> decreases parallelism to one, and "netty tasks / unprocessed messages" keep
> piling up.
>
> Removing synchronization *did solve* the problem for me, because it
> allows flink to leverage the whole netty event loop pool and it's ok to
> have a single thread blocked for a little while (we still can accept
> connections with other threads).
>
> Let me think about how to get a relevant cpu graph from the TM, it's kind
> of hard to target a "defective node". Anyway attached are some graphs from
> such a busy node in time of failure.
>
> Is there any special reason for the synchronization I don't see? I have a
> feeling it's only for sychronizing `registredPartitions` map access and
> that it's perfectly ok not to synchronize `createSubpartitionView` and
> `releasePartition` calls.
>
> Thanks,
> D.
>
> On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <pi...@ververica.com>
> wrote:
>
>> Hi David,
>>
>> > with high load and io waits
>>
>> How high values are talking about?
>>
>> Could you attach a CPU profiler and post the results somehow? Which
>> threads are busy on what call trees?
>>
>>
>>
>>
>> Regarding the idea of removing of the locks in the
>> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t
>> expect it to solve the problem fully.
>>
>> Doing a blocking IO in the Netty threads is already asking for troubles,
>> as even without locks it can block and crash many things (like
>> createSubpartitionView). But one improvement that we might take a look, is
>> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking
>> calls on pre-fetching the buffer. If not for that, code inside
>> `ResultPartitionManager` would probably be non blocking, or at least much
>> less blocking.
>>
>> Piotrek
>>
>> > On 29 Jan 2020, at 14:05, Stephan Ewen <se...@apache.org> wrote:
>> >
>> > /CC Piotr and Zhijiang
>> >
>> > Sounds reasonable at first glance. Would like to hear Piotr's and
>> Zhijiang's take, though, they know that code better than me.
>> >
>> > On Wed, Jan 29, 2020 at 1:58 PM David Morávek <david.mora...@gmail.com
>> <mailto:david.mora...@gmail.com>> wrote:
>> > Hi Stephan,
>> >
>> > I've actually managed to narrow problem down to blocked netty server
>> > threads. I'm using 1.9.1 with few custom patches
>> > <https://github.com/dmvk/flink/commits/1.9.1-szn <
>> https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant
>> to
>> > this issue.
>> >
>> > To highlight the problem, I've added these checks to
>> ResultPartitionManager
>> > <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d <
>> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which
>> > measure how long "netty tasks" take to execute (monitor acquisition +
>> > actual execution).
>> >
>> > We indeed have a pretty busy cluster, with high load and io waits
>> (mostly
>> > due to ongoing shuffles and computations). From the measurements I can
>> see
>> > numbers like:
>> >
>> > createSubpartitionView: 129255ms
>> > createSubpartitionView: 129333ms
>> > createSubpartitionView: 129353ms
>> > createSubpartitionView: 129354ms
>> > createSubpartitionView: 144419ms
>> > createSubpartitionView: 144653ms
>> > createSubpartitionView: 144759ms
>> > createSubpartitionView: 144905ms
>> > releasePartition: 145218ms
>> > releasePartition: 145250ms
>> > releasePartition: 145256ms
>> > releasePartition: 145263ms
>> >
>> > These vary a lot, depending on what other pipelines are being
>> > simultaneously executed. These numbers imply that at least for 145
>> seconds
>> > (which is greater than conn. timeout), taskmanger was not able to accept
>> > any connection (because of netty internals
>> > <https://github.com/netty/netty/issues/240 <
>> https://github.com/netty/netty/issues/240>>).
>> >
>> > Switching to *file* backed BoundedData implementation didn't help,
>> because
>> > there are still heavy IO ops being executed by netty threads when
>> monitor
>> > is acquired (eg. deletion of backing file).
>> >
>> > 1) I've tried to make more fine-grained locking (locking as single
>> > partition insead of partitionManager). This helped a little, but
>> pipeline
>> > is still unable to finish in some cases.
>> >
>> > 2) Currently I'm trying to completely remove locking from
>> > ResultPartitionManager, as it is seems only relevant for internal
>> > datastructure access (can be replaced with java.concurrent). I think
>> this
>> > should also have an impact to overall job completition times in some
>> cases.
>> >
>> > What do you think?
>> >
>> > On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <se...@apache.org <mailto:
>> se...@apache.org>> wrote:
>> >
>> > > Hi!
>> > >
>> > > Concerning JAR files: I think this has nothing to do with it, it is a
>> batch
>> > > shuffle after all. The previous stage must have completed already.
>> > >
>> > > A few things that come to my mind:
>> > >   - What Flink version are you using? 1.9?
>> > >   - Are you sure that the source TaskManager is still running? Earlier
>> > > Flink versions had an issue with releasing TMs too early, sometimes
>> before
>> > > the result was fetched by a consumer.
>> > >   - The buffer creation on the sender / netty server side is more
>> expensive
>> > > than necessary, but should be nowhere near as expensive to cause a
>> stall.
>> > >
>> > > Can you elaborate on what the shared lock is that all server threads
>> are
>> > > using?
>> > >
>> > > Best,
>> > > Stephan
>> > >
>> > >
>> > > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com
>> <mailto:pi...@ververica.com>>
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > > In case of large jar, wouldn't this happen in previous stages as
>> well
>> > > (if
>> > > > > so this should not be the case)?
>> > > >
>> > > > I’m not exactly sure how jars are distributed, but if they are being
>> > > > sent/uploaded from one (or some other static/fixed number, like
>> uploading
>> > > > to and reading from a DFS) node to all, this might not scale well.
>> Also
>> > > > your dev deployment might not be stressing
>> network/storage/something as
>> > > > much as production deployment, which can also affect time to deploy
>> the
>> > > job.
>> > > >
>> > > > What’s yours job size? (How large is the jar uploaded to Flink?)
>> > > >
>> > > > Also there might be other factors in play here, like if you are
>> using
>> > > > Flink job mode (not stand alone), time to start up a Flink node
>> might be
>> > > > too long. Some nodes are already up and running and they are time
>> outing
>> > > > waiting for others to start up.
>> > > >
>> > > > > Also there shouldn't be any state involved
>> > > > > (unless Beam IO's use it internally).
>> > > >
>> > > > My bad. Instead of
>> > > >
>> > > > > - data size per single job run ranging from 100G to 1TB
>> > > >
>> > > > I read state size 100G to 1TB.
>> > > >
>> > > > Piotrek
>> > > >
>> > > > >
>> > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <
>> pi...@ververica.com <mailto:pi...@ververica.com>>
>> > > > wrote:
>> > > > >
>> > > > >> Hi David,
>> > > > >>
>> > > > >> The usual cause for connection time out is long deployment. For
>> > > example
>> > > > if
>> > > > >> your Job's jar is large and takes long time to distribute across
>> the
>> > > > >> cluster. I’m not sure if large state could affect this as well
>> or not.
>> > > > Are
>> > > > >> you sure that’s not the case?
>> > > > >>
>> > > > >> The think you are suggesting, I haven’t heard about previously,
>> but
>> > > > indeed
>> > > > >> theoretically it could happen. Reading from mmap’ed sub
>> partitions
>> > > could
>> > > > >> block the Netty threads if kernel decides to drop mmap’ed page
>> and it
>> > > > has
>> > > > >> to be read from the disks. Could you check your CPU and disks IO
>> > > usage?
>> > > > >> This should be visible by high IOWait CPU usage. Could you for
>> example
>> > > > post
>> > > > >> couple of sample results of
>> > > > >>
>> > > > >>        iostat -xm 2
>> > > > >>
>> > > > >> command from some representative Task Manager? If indeed disks
>> are
>> > > > >> overloaded, changing Flink’s config option
>> > > > >>
>> > > > >>        taskmanager.network.bounded-blocking-subpartition-type
>> > > > >>
>> > > > >> From default to `file` could solve the problem. FYI, this option
>> is
>> > > > >> renamed in 1.10 to
>> > > > >>
>> > > > >>        taskmanager.network.blocking-shuffle.type
>> > > > >>
>> > > > >> And it’s default value will be `file`.
>> > > > >>
>> > > > >> We would appreciate if you could get back to us with the results!
>> > > > >>
>> > > > >> Piotrek
>> > > > >>
>> > > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org
>> <mailto:trohrm...@apache.org>>
>> > > wrote:
>> > > > >>>
>> > > > >>> Hi David,
>> > > > >>>
>> > > > >>> I'm unfortunately not familiar with these parts of Flink but I'm
>> > > > pulling
>> > > > >>> Piotr in who might be able to tell you more.
>> > > > >>>
>> > > > >>> Cheers,
>> > > > >>> Till
>> > > > >>>
>> > > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org
>> <mailto:d...@apache.org>>
>> > > wrote:
>> > > > >>>
>> > > > >>>> Hello community,
>> > > > >>>>
>> > > > >>>> I'm currently struggling with an Apache Beam batch pipeline on
>> top
>> > > of
>> > > > >>>> Flink. The pipeline runs smoothly in smaller environments, but
>> in
>> > > > >>>> production it always ends up with `connection timeout` in one
>> of the
>> > > > >> last
>> > > > >>>> shuffle phases.
>> > > > >>>>
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>> .network.partition.consumer.PartitionConnectionException:
>> > > > >>>> Connection for partition
>> > > > >>>>
>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f
>> > > not
>> > > > >>>> reachable.
>> > > > >>>>       at org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>> > > > >>>>       at org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>> > > > >>>>       at org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>> > > > >>>>       at
>> > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>> > > > >>>>       at
>> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> > > > >>>>       at java.lang.Thread.run(Thread.java:748)
>> > > > >>>> ...
>> > > > >>>> Caused by:
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> > > > >>>> Connection timed out: ##########/10.249.28.39:25709 <
>> http://10.249.28.39:25709/>
>> > > > >>>>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native
>> Method)
>> > > > >>>>       at
>> > > > >>>>
>> > > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>> > > > >>>>       at ...
>> > > > >>>>
>> > > > >>>> Basically the pipeline looks as follows:
>> > > > >>>>
>> > > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy
>> > > > >> computation
>> > > > >>>> - few secs per element) -> write to multiple outputs (~4)
>> > > > >>>>
>> > > > >>>> - cluster size: 100 tms
>> > > > >>>> - slots per tm: 4
>> > > > >>>> - data size per single job run ranging from 100G to 1TB
>> > > > >>>> - job paralelism: 400
>> > > > >>>>
>> > > > >>>> I've tried to increase netty
>> > > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no
>> luck.
>> > > > Also
>> > > > >>>> increasing # of netty threads did not help. JVM performs ok (no
>> > > ooms,
>> > > > gc
>> > > > >>>> pauses, ...). Connect backlog defaults to 128.
>> > > > >>>>
>> > > > >>>> This is probably caused by netty threads being blocked on the
>> server
>> > > > >> side.
>> > > > >>>> All these threads share the same lock, so increasing number of
>> > > threads
>> > > > >>>> won't help.
>> > > > >>>>
>> > > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0
>> > > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000]
>> > > > >>>>  java.lang.Thread.State: RUNNABLE
>> > > > >>>>       at java.lang.Number.<init>(Number.java:55)
>> > > > >>>>       at java.lang.Long.<init>(Long.java:947)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36)
>> > > > >>>>       at java.lang.reflect.Field.get(Field.java:393)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201)
>> > > > >>>>       - locked <0x00000006d822e180> (a java.lang.Object)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72)
>> > > > >>>>       - locked <0x00000006cad32578> (a java.util.HashMap)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86)
>> > > > >>>>       - locked <0x000000079767ff38> (a java.lang.Object)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102)
>> > > > >>>>       at
>> > > > >>>> org.apache.flink.runtime.io <
>> http://org.apache.flink.runtime.io/>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>> > > > >>>>       at
>> > > > >>>>
>> > > > >>>>
>> > > > >>
>> > > >
>> > >
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>> > > > >>>>
>> > > > >>>> This may be related to mmap backed BoundedData implementation,
>> where
>> > > > >>>> `nextBuffer` seems to be somehow expensive (reflection,
>> skipping
>> > > empty
>> > > > >>>> buffers?) . Just to note, last phases only shuffle metadata
>> > > (kilobyte
>> > > > >>>> scale), but the job paralelism remains the same due to beam
>> nature
>> > > > >> (400).
>> > > > >>>>
>> > > > >>>> Does this sound familiar to anyone? Do you have any
>> suggestions how
>> > > to
>> > > > >>>> solve this?
>> > > > >>>>
>> > > > >>>> Thanks for help,
>> > > > >>>> David
>> > > > >>>>
>> > > > >>
>> > > > >>
>> > > >
>> > > >
>> > >
>>
>>

Reply via email to