Hi Piotr, removal of buffer prefetch in BoundedBlockingSubpartitionReader did not help, I've already tried that (there are still other problematic code paths, eg. releasePartition). I think it's perfectly ok to perform IO ops in netty threads, we just have to make sure, we can leverage multiple threads at once. Synchronization in ResultPartitionManager effectively decreases parallelism to one, and "netty tasks / unprocessed messages" keep piling up.
Removing synchronization *did solve* the problem for me, because it allows flink to leverage the whole netty event loop pool and it's ok to have a single thread blocked for a little while (we still can accept connections with other threads). Let me think about how to get a relevant cpu graph from the TM, it's kind of hard to target a "defective node". Anyway attached are some graphs from such a busy node in time of failure. Is there any special reason for the synchronization I don't see? I have a feeling it's only for sychronizing `registredPartitions` map access and that it's perfectly ok not to synchronize `createSubpartitionView` and `releasePartition` calls. Thanks, D. On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi David, > > > with high load and io waits > > How high values are talking about? > > Could you attach a CPU profiler and post the results somehow? Which > threads are busy on what call trees? > > > > > Regarding the idea of removing of the locks in the > `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t > expect it to solve the problem fully. > > Doing a blocking IO in the Netty threads is already asking for troubles, > as even without locks it can block and crash many things (like > createSubpartitionView). But one improvement that we might take a look, is > that `BoundedBlockingSubpartitionReader` constructor is doing a blocking > calls on pre-fetching the buffer. If not for that, code inside > `ResultPartitionManager` would probably be non blocking, or at least much > less blocking. > > Piotrek > > > On 29 Jan 2020, at 14:05, Stephan Ewen <se...@apache.org> wrote: > > > > /CC Piotr and Zhijiang > > > > Sounds reasonable at first glance. Would like to hear Piotr's and > Zhijiang's take, though, they know that code better than me. > > > > On Wed, Jan 29, 2020 at 1:58 PM David Morávek <david.mora...@gmail.com > <mailto:david.mora...@gmail.com>> wrote: > > Hi Stephan, > > > > I've actually managed to narrow problem down to blocked netty server > > threads. I'm using 1.9.1 with few custom patches > > <https://github.com/dmvk/flink/commits/1.9.1-szn < > https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant > to > > this issue. > > > > To highlight the problem, I've added these checks to > ResultPartitionManager > > <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d < > https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which > > measure how long "netty tasks" take to execute (monitor acquisition + > > actual execution). > > > > We indeed have a pretty busy cluster, with high load and io waits (mostly > > due to ongoing shuffles and computations). From the measurements I can > see > > numbers like: > > > > createSubpartitionView: 129255ms > > createSubpartitionView: 129333ms > > createSubpartitionView: 129353ms > > createSubpartitionView: 129354ms > > createSubpartitionView: 144419ms > > createSubpartitionView: 144653ms > > createSubpartitionView: 144759ms > > createSubpartitionView: 144905ms > > releasePartition: 145218ms > > releasePartition: 145250ms > > releasePartition: 145256ms > > releasePartition: 145263ms > > > > These vary a lot, depending on what other pipelines are being > > simultaneously executed. These numbers imply that at least for 145 > seconds > > (which is greater than conn. timeout), taskmanger was not able to accept > > any connection (because of netty internals > > <https://github.com/netty/netty/issues/240 < > https://github.com/netty/netty/issues/240>>). > > > > Switching to *file* backed BoundedData implementation didn't help, > because > > there are still heavy IO ops being executed by netty threads when monitor > > is acquired (eg. deletion of backing file). > > > > 1) I've tried to make more fine-grained locking (locking as single > > partition insead of partitionManager). This helped a little, but pipeline > > is still unable to finish in some cases. > > > > 2) Currently I'm trying to completely remove locking from > > ResultPartitionManager, as it is seems only relevant for internal > > datastructure access (can be replaced with java.concurrent). I think this > > should also have an impact to overall job completition times in some > cases. > > > > What do you think? > > > > On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <se...@apache.org <mailto: > se...@apache.org>> wrote: > > > > > Hi! > > > > > > Concerning JAR files: I think this has nothing to do with it, it is a > batch > > > shuffle after all. The previous stage must have completed already. > > > > > > A few things that come to my mind: > > > - What Flink version are you using? 1.9? > > > - Are you sure that the source TaskManager is still running? Earlier > > > Flink versions had an issue with releasing TMs too early, sometimes > before > > > the result was fetched by a consumer. > > > - The buffer creation on the sender / netty server side is more > expensive > > > than necessary, but should be nowhere near as expensive to cause a > stall. > > > > > > Can you elaborate on what the shared lock is that all server threads > are > > > using? > > > > > > Best, > > > Stephan > > > > > > > > > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> > > > wrote: > > > > > > > Hi, > > > > > > > > > In case of large jar, wouldn't this happen in previous stages as > well > > > (if > > > > > so this should not be the case)? > > > > > > > > I’m not exactly sure how jars are distributed, but if they are being > > > > sent/uploaded from one (or some other static/fixed number, like > uploading > > > > to and reading from a DFS) node to all, this might not scale well. > Also > > > > your dev deployment might not be stressing network/storage/something > as > > > > much as production deployment, which can also affect time to deploy > the > > > job. > > > > > > > > What’s yours job size? (How large is the jar uploaded to Flink?) > > > > > > > > Also there might be other factors in play here, like if you are using > > > > Flink job mode (not stand alone), time to start up a Flink node > might be > > > > too long. Some nodes are already up and running and they are time > outing > > > > waiting for others to start up. > > > > > > > > > Also there shouldn't be any state involved > > > > > (unless Beam IO's use it internally). > > > > > > > > My bad. Instead of > > > > > > > > > - data size per single job run ranging from 100G to 1TB > > > > > > > > I read state size 100G to 1TB. > > > > > > > > Piotrek > > > > > > > > > > > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski < > pi...@ververica.com <mailto:pi...@ververica.com>> > > > > wrote: > > > > > > > > > >> Hi David, > > > > >> > > > > >> The usual cause for connection time out is long deployment. For > > > example > > > > if > > > > >> your Job's jar is large and takes long time to distribute across > the > > > > >> cluster. I’m not sure if large state could affect this as well or > not. > > > > Are > > > > >> you sure that’s not the case? > > > > >> > > > > >> The think you are suggesting, I haven’t heard about previously, > but > > > > indeed > > > > >> theoretically it could happen. Reading from mmap’ed sub partitions > > > could > > > > >> block the Netty threads if kernel decides to drop mmap’ed page > and it > > > > has > > > > >> to be read from the disks. Could you check your CPU and disks IO > > > usage? > > > > >> This should be visible by high IOWait CPU usage. Could you for > example > > > > post > > > > >> couple of sample results of > > > > >> > > > > >> iostat -xm 2 > > > > >> > > > > >> command from some representative Task Manager? If indeed disks are > > > > >> overloaded, changing Flink’s config option > > > > >> > > > > >> taskmanager.network.bounded-blocking-subpartition-type > > > > >> > > > > >> From default to `file` could solve the problem. FYI, this option > is > > > > >> renamed in 1.10 to > > > > >> > > > > >> taskmanager.network.blocking-shuffle.type > > > > >> > > > > >> And it’s default value will be `file`. > > > > >> > > > > >> We would appreciate if you could get back to us with the results! > > > > >> > > > > >> Piotrek > > > > >> > > > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org > <mailto:trohrm...@apache.org>> > > > wrote: > > > > >>> > > > > >>> Hi David, > > > > >>> > > > > >>> I'm unfortunately not familiar with these parts of Flink but I'm > > > > pulling > > > > >>> Piotr in who might be able to tell you more. > > > > >>> > > > > >>> Cheers, > > > > >>> Till > > > > >>> > > > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org > <mailto:d...@apache.org>> > > > wrote: > > > > >>> > > > > >>>> Hello community, > > > > >>>> > > > > >>>> I'm currently struggling with an Apache Beam batch pipeline on > top > > > of > > > > >>>> Flink. The pipeline runs smoothly in smaller environments, but > in > > > > >>>> production it always ends up with `connection timeout` in one > of the > > > > >> last > > > > >>>> shuffle phases. > > > > >>>> > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> .network.partition.consumer.PartitionConnectionException: > > > > >>>> Connection for partition > > > > >>>> > 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f > > > not > > > > >>>> reachable. > > > > >>>> at org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > > > > >>>> at org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > > > > >>>> at org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > > > > >>>> at > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > > > > >>>> at > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > > > > >>>> at > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > > > > >>>> at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > > >>>> at java.lang.Thread.run(Thread.java:748) > > > > >>>> ... > > > > >>>> Caused by: > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > > > > >>>> Connection timed out: ##########/10.249.28.39:25709 < > http://10.249.28.39:25709/> > > > > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native > Method) > > > > >>>> at > > > > >>>> > > > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > > > > >>>> at ... > > > > >>>> > > > > >>>> Basically the pipeline looks as follows: > > > > >>>> > > > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > > > > >> computation > > > > >>>> - few secs per element) -> write to multiple outputs (~4) > > > > >>>> > > > > >>>> - cluster size: 100 tms > > > > >>>> - slots per tm: 4 > > > > >>>> - data size per single job run ranging from 100G to 1TB > > > > >>>> - job paralelism: 400 > > > > >>>> > > > > >>>> I've tried to increase netty > > > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no > luck. > > > > Also > > > > >>>> increasing # of netty threads did not help. JVM performs ok (no > > > ooms, > > > > gc > > > > >>>> pauses, ...). Connect backlog defaults to 128. > > > > >>>> > > > > >>>> This is probably caused by netty threads being blocked on the > server > > > > >> side. > > > > >>>> All these threads share the same lock, so increasing number of > > > threads > > > > >>>> won't help. > > > > >>>> > > > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > > > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at java.lang.Number.<init>(Number.java:55) > > > > >>>> at java.lang.Long.<init>(Long.java:947) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > > > > >>>> at java.lang.reflect.Field.get(Field.java:393) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > > > > >>>> - locked <0x00000006d822e180> (a java.lang.Object) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > > > > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > > > > >>>> - locked <0x000000079767ff38> (a java.lang.Object) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > > > > >>>> at > > > > >>>> org.apache.flink.runtime.io < > http://org.apache.flink.runtime.io/> > > > > >>>> > > > > >> > > > > > > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > > > > >>>> at > > > > >>>> > > > > >>>> > > > > >> > > > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > > > > >>>> > > > > >>>> This may be related to mmap backed BoundedData implementation, > where > > > > >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping > > > empty > > > > >>>> buffers?) . Just to note, last phases only shuffle metadata > > > (kilobyte > > > > >>>> scale), but the job paralelism remains the same due to beam > nature > > > > >> (400). > > > > >>>> > > > > >>>> Does this sound familiar to anyone? Do you have any suggestions > how > > > to > > > > >>>> solve this? > > > > >>>> > > > > >>>> Thanks for help, > > > > >>>> David > > > > >>>> > > > > >> > > > > >> > > > > > > > > > > > > >