Just to clarify, these are bare metal nodes (128G ram, 16 cpus + hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase.
D. On Wed, Jan 29, 2020 at 5:03 PM David Morávek <david.mora...@gmail.com> wrote: > Hi Piotr, > > removal of buffer prefetch in BoundedBlockingSubpartitionReader did not > help, I've already tried that (there are still other problematic code > paths, eg. releasePartition). I think it's perfectly ok to perform IO ops > in netty threads, we just have to make sure, we can leverage multiple > threads at once. Synchronization in ResultPartitionManager effectively > decreases parallelism to one, and "netty tasks / unprocessed messages" keep > piling up. > > Removing synchronization *did solve* the problem for me, because it > allows flink to leverage the whole netty event loop pool and it's ok to > have a single thread blocked for a little while (we still can accept > connections with other threads). > > Let me think about how to get a relevant cpu graph from the TM, it's kind > of hard to target a "defective node". Anyway attached are some graphs from > such a busy node in time of failure. > > Is there any special reason for the synchronization I don't see? I have a > feeling it's only for sychronizing `registredPartitions` map access and > that it's perfectly ok not to synchronize `createSubpartitionView` and > `releasePartition` calls. > > Thanks, > D. > > On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <pi...@ververica.com> > wrote: > >> Hi David, >> >> > with high load and io waits >> >> How high values are talking about? >> >> Could you attach a CPU profiler and post the results somehow? Which >> threads are busy on what call trees? >> >> >> >> >> Regarding the idea of removing of the locks in the >> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t >> expect it to solve the problem fully. >> >> Doing a blocking IO in the Netty threads is already asking for troubles, >> as even without locks it can block and crash many things (like >> createSubpartitionView). But one improvement that we might take a look, is >> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking >> calls on pre-fetching the buffer. If not for that, code inside >> `ResultPartitionManager` would probably be non blocking, or at least much >> less blocking. >> >> Piotrek >> >> > On 29 Jan 2020, at 14:05, Stephan Ewen <se...@apache.org> wrote: >> > >> > /CC Piotr and Zhijiang >> > >> > Sounds reasonable at first glance. Would like to hear Piotr's and >> Zhijiang's take, though, they know that code better than me. >> > >> > On Wed, Jan 29, 2020 at 1:58 PM David Morávek <david.mora...@gmail.com >> <mailto:david.mora...@gmail.com>> wrote: >> > Hi Stephan, >> > >> > I've actually managed to narrow problem down to blocked netty server >> > threads. I'm using 1.9.1 with few custom patches >> > <https://github.com/dmvk/flink/commits/1.9.1-szn < >> https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant >> to >> > this issue. >> > >> > To highlight the problem, I've added these checks to >> ResultPartitionManager >> > <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d < >> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which >> > measure how long "netty tasks" take to execute (monitor acquisition + >> > actual execution). >> > >> > We indeed have a pretty busy cluster, with high load and io waits >> (mostly >> > due to ongoing shuffles and computations). From the measurements I can >> see >> > numbers like: >> > >> > createSubpartitionView: 129255ms >> > createSubpartitionView: 129333ms >> > createSubpartitionView: 129353ms >> > createSubpartitionView: 129354ms >> > createSubpartitionView: 144419ms >> > createSubpartitionView: 144653ms >> > createSubpartitionView: 144759ms >> > createSubpartitionView: 144905ms >> > releasePartition: 145218ms >> > releasePartition: 145250ms >> > releasePartition: 145256ms >> > releasePartition: 145263ms >> > >> > These vary a lot, depending on what other pipelines are being >> > simultaneously executed. These numbers imply that at least for 145 >> seconds >> > (which is greater than conn. timeout), taskmanger was not able to accept >> > any connection (because of netty internals >> > <https://github.com/netty/netty/issues/240 < >> https://github.com/netty/netty/issues/240>>). >> > >> > Switching to *file* backed BoundedData implementation didn't help, >> because >> > there are still heavy IO ops being executed by netty threads when >> monitor >> > is acquired (eg. deletion of backing file). >> > >> > 1) I've tried to make more fine-grained locking (locking as single >> > partition insead of partitionManager). This helped a little, but >> pipeline >> > is still unable to finish in some cases. >> > >> > 2) Currently I'm trying to completely remove locking from >> > ResultPartitionManager, as it is seems only relevant for internal >> > datastructure access (can be replaced with java.concurrent). I think >> this >> > should also have an impact to overall job completition times in some >> cases. >> > >> > What do you think? >> > >> > On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <se...@apache.org <mailto: >> se...@apache.org>> wrote: >> > >> > > Hi! >> > > >> > > Concerning JAR files: I think this has nothing to do with it, it is a >> batch >> > > shuffle after all. The previous stage must have completed already. >> > > >> > > A few things that come to my mind: >> > > - What Flink version are you using? 1.9? >> > > - Are you sure that the source TaskManager is still running? Earlier >> > > Flink versions had an issue with releasing TMs too early, sometimes >> before >> > > the result was fetched by a consumer. >> > > - The buffer creation on the sender / netty server side is more >> expensive >> > > than necessary, but should be nowhere near as expensive to cause a >> stall. >> > > >> > > Can you elaborate on what the shared lock is that all server threads >> are >> > > using? >> > > >> > > Best, >> > > Stephan >> > > >> > > >> > > On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com>> >> > > wrote: >> > > >> > > > Hi, >> > > > >> > > > > In case of large jar, wouldn't this happen in previous stages as >> well >> > > (if >> > > > > so this should not be the case)? >> > > > >> > > > I’m not exactly sure how jars are distributed, but if they are being >> > > > sent/uploaded from one (or some other static/fixed number, like >> uploading >> > > > to and reading from a DFS) node to all, this might not scale well. >> Also >> > > > your dev deployment might not be stressing >> network/storage/something as >> > > > much as production deployment, which can also affect time to deploy >> the >> > > job. >> > > > >> > > > What’s yours job size? (How large is the jar uploaded to Flink?) >> > > > >> > > > Also there might be other factors in play here, like if you are >> using >> > > > Flink job mode (not stand alone), time to start up a Flink node >> might be >> > > > too long. Some nodes are already up and running and they are time >> outing >> > > > waiting for others to start up. >> > > > >> > > > > Also there shouldn't be any state involved >> > > > > (unless Beam IO's use it internally). >> > > > >> > > > My bad. Instead of >> > > > >> > > > > - data size per single job run ranging from 100G to 1TB >> > > > >> > > > I read state size 100G to 1TB. >> > > > >> > > > Piotrek >> > > > >> > > > > >> > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski < >> pi...@ververica.com <mailto:pi...@ververica.com>> >> > > > wrote: >> > > > > >> > > > >> Hi David, >> > > > >> >> > > > >> The usual cause for connection time out is long deployment. For >> > > example >> > > > if >> > > > >> your Job's jar is large and takes long time to distribute across >> the >> > > > >> cluster. I’m not sure if large state could affect this as well >> or not. >> > > > Are >> > > > >> you sure that’s not the case? >> > > > >> >> > > > >> The think you are suggesting, I haven’t heard about previously, >> but >> > > > indeed >> > > > >> theoretically it could happen. Reading from mmap’ed sub >> partitions >> > > could >> > > > >> block the Netty threads if kernel decides to drop mmap’ed page >> and it >> > > > has >> > > > >> to be read from the disks. Could you check your CPU and disks IO >> > > usage? >> > > > >> This should be visible by high IOWait CPU usage. Could you for >> example >> > > > post >> > > > >> couple of sample results of >> > > > >> >> > > > >> iostat -xm 2 >> > > > >> >> > > > >> command from some representative Task Manager? If indeed disks >> are >> > > > >> overloaded, changing Flink’s config option >> > > > >> >> > > > >> taskmanager.network.bounded-blocking-subpartition-type >> > > > >> >> > > > >> From default to `file` could solve the problem. FYI, this option >> is >> > > > >> renamed in 1.10 to >> > > > >> >> > > > >> taskmanager.network.blocking-shuffle.type >> > > > >> >> > > > >> And it’s default value will be `file`. >> > > > >> >> > > > >> We would appreciate if you could get back to us with the results! >> > > > >> >> > > > >> Piotrek >> > > > >> >> > > > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org >> <mailto:trohrm...@apache.org>> >> > > wrote: >> > > > >>> >> > > > >>> Hi David, >> > > > >>> >> > > > >>> I'm unfortunately not familiar with these parts of Flink but I'm >> > > > pulling >> > > > >>> Piotr in who might be able to tell you more. >> > > > >>> >> > > > >>> Cheers, >> > > > >>> Till >> > > > >>> >> > > > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org >> <mailto:d...@apache.org>> >> > > wrote: >> > > > >>> >> > > > >>>> Hello community, >> > > > >>>> >> > > > >>>> I'm currently struggling with an Apache Beam batch pipeline on >> top >> > > of >> > > > >>>> Flink. The pipeline runs smoothly in smaller environments, but >> in >> > > > >>>> production it always ends up with `connection timeout` in one >> of the >> > > > >> last >> > > > >>>> shuffle phases. >> > > > >>>> >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> .network.partition.consumer.PartitionConnectionException: >> > > > >>>> Connection for partition >> > > > >>>> >> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f >> > > not >> > > > >>>> reachable. >> > > > >>>> at org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >> > > > >>>> at org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >> > > > >>>> at org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >> > > > >>>> at >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >> > > > >>>> at >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >> > > > >>>> at >> > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >> > > > >>>> at >> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> > > > >>>> at java.lang.Thread.run(Thread.java:748) >> > > > >>>> ... >> > > > >>>> Caused by: >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >> > > > >>>> Connection timed out: ##########/10.249.28.39:25709 < >> http://10.249.28.39:25709/> >> > > > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native >> Method) >> > > > >>>> at >> > > > >>>> >> > > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >> > > > >>>> at ... >> > > > >>>> >> > > > >>>> Basically the pipeline looks as follows: >> > > > >>>> >> > > > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy >> > > > >> computation >> > > > >>>> - few secs per element) -> write to multiple outputs (~4) >> > > > >>>> >> > > > >>>> - cluster size: 100 tms >> > > > >>>> - slots per tm: 4 >> > > > >>>> - data size per single job run ranging from 100G to 1TB >> > > > >>>> - job paralelism: 400 >> > > > >>>> >> > > > >>>> I've tried to increase netty >> > > > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no >> luck. >> > > > Also >> > > > >>>> increasing # of netty threads did not help. JVM performs ok (no >> > > ooms, >> > > > gc >> > > > >>>> pauses, ...). Connect backlog defaults to 128. >> > > > >>>> >> > > > >>>> This is probably caused by netty threads being blocked on the >> server >> > > > >> side. >> > > > >>>> All these threads share the same lock, so increasing number of >> > > threads >> > > > >>>> won't help. >> > > > >>>> >> > > > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 >> > > > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] >> > > > >>>> java.lang.Thread.State: RUNNABLE >> > > > >>>> at java.lang.Number.<init>(Number.java:55) >> > > > >>>> at java.lang.Long.<init>(Long.java:947) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) >> > > > >>>> at java.lang.reflect.Field.get(Field.java:393) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) >> > > > >>>> - locked <0x00000006d822e180> (a java.lang.Object) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) >> > > > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) >> > > > >>>> - locked <0x000000079767ff38> (a java.lang.Object) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) >> > > > >>>> at >> > > > >>>> org.apache.flink.runtime.io < >> http://org.apache.flink.runtime.io/> >> > > > >>>> >> > > > >> >> > > > >> > > >> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) >> > > > >>>> at >> > > > >>>> >> > > > >>>> >> > > > >> >> > > > >> > > >> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) >> > > > >>>> >> > > > >>>> This may be related to mmap backed BoundedData implementation, >> where >> > > > >>>> `nextBuffer` seems to be somehow expensive (reflection, >> skipping >> > > empty >> > > > >>>> buffers?) . Just to note, last phases only shuffle metadata >> > > (kilobyte >> > > > >>>> scale), but the job paralelism remains the same due to beam >> nature >> > > > >> (400). >> > > > >>>> >> > > > >>>> Does this sound familiar to anyone? Do you have any >> suggestions how >> > > to >> > > > >>>> solve this? >> > > > >>>> >> > > > >>>> Thanks for help, >> > > > >>>> David >> > > > >>>> >> > > > >> >> > > > >> >> > > > >> > > > >> > > >> >>