/CC Piotr and Zhijiang Sounds reasonable at first glance. Would like to hear Piotr's and Zhijiang's take, though, they know that code better than me.
On Wed, Jan 29, 2020 at 1:58 PM David Morávek <david.mora...@gmail.com> wrote: > Hi Stephan, > > I've actually managed to narrow problem down to blocked netty server > threads. I'm using 1.9.1 with few custom patches > <https://github.com/dmvk/flink/commits/1.9.1-szn>, that are not relevant > to > this issue. > > To highlight the problem, I've added these checks to ResultPartitionManager > <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>, which > measure how long "netty tasks" take to execute (monitor acquisition + > actual execution). > > We indeed have a pretty busy cluster, with high load and io waits (mostly > due to ongoing shuffles and computations). From the measurements I can see > numbers like: > > createSubpartitionView: 129255ms > createSubpartitionView: 129333ms > createSubpartitionView: 129353ms > createSubpartitionView: 129354ms > createSubpartitionView: 144419ms > createSubpartitionView: 144653ms > createSubpartitionView: 144759ms > createSubpartitionView: 144905ms > releasePartition: 145218ms > releasePartition: 145250ms > releasePartition: 145256ms > releasePartition: 145263ms > > These vary a lot, depending on what other pipelines are being > simultaneously executed. These numbers imply that at least for 145 seconds > (which is greater than conn. timeout), taskmanger was not able to accept > any connection (because of netty internals > <https://github.com/netty/netty/issues/240>). > > Switching to *file* backed BoundedData implementation didn't help, because > there are still heavy IO ops being executed by netty threads when monitor > is acquired (eg. deletion of backing file). > > 1) I've tried to make more fine-grained locking (locking as single > partition insead of partitionManager). This helped a little, but pipeline > is still unable to finish in some cases. > > 2) Currently I'm trying to completely remove locking from > ResultPartitionManager, as it is seems only relevant for internal > datastructure access (can be replaced with java.concurrent). I think this > should also have an impact to overall job completition times in some cases. > > What do you think? > > On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <se...@apache.org> wrote: > > > Hi! > > > > Concerning JAR files: I think this has nothing to do with it, it is a > batch > > shuffle after all. The previous stage must have completed already. > > > > A few things that come to my mind: > > - What Flink version are you using? 1.9? > > - Are you sure that the source TaskManager is still running? Earlier > > Flink versions had an issue with releasing TMs too early, sometimes > before > > the result was fetched by a consumer. > > - The buffer creation on the sender / netty server side is more > expensive > > than necessary, but should be nowhere near as expensive to cause a stall. > > > > Can you elaborate on what the shared lock is that all server threads are > > using? > > > > Best, > > Stephan > > > > > > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com> > > wrote: > > > > > Hi, > > > > > > > In case of large jar, wouldn't this happen in previous stages as well > > (if > > > > so this should not be the case)? > > > > > > I’m not exactly sure how jars are distributed, but if they are being > > > sent/uploaded from one (or some other static/fixed number, like > uploading > > > to and reading from a DFS) node to all, this might not scale well. Also > > > your dev deployment might not be stressing network/storage/something as > > > much as production deployment, which can also affect time to deploy the > > job. > > > > > > What’s yours job size? (How large is the jar uploaded to Flink?) > > > > > > Also there might be other factors in play here, like if you are using > > > Flink job mode (not stand alone), time to start up a Flink node might > be > > > too long. Some nodes are already up and running and they are time > outing > > > waiting for others to start up. > > > > > > > Also there shouldn't be any state involved > > > > (unless Beam IO's use it internally). > > > > > > My bad. Instead of > > > > > > > - data size per single job run ranging from 100G to 1TB > > > > > > I read state size 100G to 1TB. > > > > > > Piotrek > > > > > > > > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <pi...@ververica.com > > > > > wrote: > > > > > > > >> Hi David, > > > >> > > > >> The usual cause for connection time out is long deployment. For > > example > > > if > > > >> your Job's jar is large and takes long time to distribute across the > > > >> cluster. I’m not sure if large state could affect this as well or > not. > > > Are > > > >> you sure that’s not the case? > > > >> > > > >> The think you are suggesting, I haven’t heard about previously, but > > > indeed > > > >> theoretically it could happen. Reading from mmap’ed sub partitions > > could > > > >> block the Netty threads if kernel decides to drop mmap’ed page and > it > > > has > > > >> to be read from the disks. Could you check your CPU and disks IO > > usage? > > > >> This should be visible by high IOWait CPU usage. Could you for > example > > > post > > > >> couple of sample results of > > > >> > > > >> iostat -xm 2 > > > >> > > > >> command from some representative Task Manager? If indeed disks are > > > >> overloaded, changing Flink’s config option > > > >> > > > >> taskmanager.network.bounded-blocking-subpartition-type > > > >> > > > >> From default to `file` could solve the problem. FYI, this option is > > > >> renamed in 1.10 to > > > >> > > > >> taskmanager.network.blocking-shuffle.type > > > >> > > > >> And it’s default value will be `file`. > > > >> > > > >> We would appreciate if you could get back to us with the results! > > > >> > > > >> Piotrek > > > >> > > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org> > > wrote: > > > >>> > > > >>> Hi David, > > > >>> > > > >>> I'm unfortunately not familiar with these parts of Flink but I'm > > > pulling > > > >>> Piotr in who might be able to tell you more. > > > >>> > > > >>> Cheers, > > > >>> Till > > > >>> > > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org> > > wrote: > > > >>> > > > >>>> Hello community, > > > >>>> > > > >>>> I'm currently struggling with an Apache Beam batch pipeline on top > > of > > > >>>> Flink. The pipeline runs smoothly in smaller environments, but in > > > >>>> production it always ends up with `connection timeout` in one of > the > > > >> last > > > >>>> shuffle phases. > > > >>>> > > > >>>> org.apache.flink.runtime.io > > > >>>> .network.partition.consumer.PartitionConnectionException: > > > >>>> Connection for partition > > > >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f > > not > > > >>>> reachable. > > > >>>> at org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > > > >>>> at org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > > > >>>> at org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > > > >>>> at > > > >>>> > > > >> > > > > > > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > > > >>>> at > > > >>>> > > > >> > > > > > > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > > > >>>> at > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > > > >>>> at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > >>>> at java.lang.Thread.run(Thread.java:748) > > > >>>> ... > > > >>>> Caused by: > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > > > >>>> Connection timed out: ##########/10.249.28.39:25709 > > > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > > > >>>> at > > > >>>> > > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > > > >>>> at ... > > > >>>> > > > >>>> Basically the pipeline looks as follows: > > > >>>> > > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > > > >> computation > > > >>>> - few secs per element) -> write to multiple outputs (~4) > > > >>>> > > > >>>> - cluster size: 100 tms > > > >>>> - slots per tm: 4 > > > >>>> - data size per single job run ranging from 100G to 1TB > > > >>>> - job paralelism: 400 > > > >>>> > > > >>>> I've tried to increase netty > > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. > > > Also > > > >>>> increasing # of netty threads did not help. JVM performs ok (no > > ooms, > > > gc > > > >>>> pauses, ...). Connect backlog defaults to 128. > > > >>>> > > > >>>> This is probably caused by netty threads being blocked on the > server > > > >> side. > > > >>>> All these threads share the same lock, so increasing number of > > threads > > > >>>> won't help. > > > >>>> > > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > > > >>>> java.lang.Thread.State: RUNNABLE > > > >>>> at java.lang.Number.<init>(Number.java:55) > > > >>>> at java.lang.Long.<init>(Long.java:947) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > > > >>>> at java.lang.reflect.Field.get(Field.java:393) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > > > >>>> - locked <0x00000006d822e180> (a java.lang.Object) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > > > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > > > >>>> - locked <0x000000079767ff38> (a java.lang.Object) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > > > >>>> at > > > >>>> org.apache.flink.runtime.io > > > >>>> > > > >> > > > > > > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > > > >>>> at > > > >>>> > > > >>>> > > > >> > > > > > > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > > > >>>> > > > >>>> This may be related to mmap backed BoundedData implementation, > where > > > >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping > > empty > > > >>>> buffers?) . Just to note, last phases only shuffle metadata > > (kilobyte > > > >>>> scale), but the job paralelism remains the same due to beam nature > > > >> (400). > > > >>>> > > > >>>> Does this sound familiar to anyone? Do you have any suggestions > how > > to > > > >>>> solve this? > > > >>>> > > > >>>> Thanks for help, > > > >>>> David > > > >>>> > > > >> > > > >> > > > > > > > > >