One more thing. Could you create a JIRA ticket for this issue? We could also move the discussion there.
Piotrek > On 30 Jan 2020, at 12:14, Piotr Nowojski <pnowoj...@apache.org> wrote: > > Hi, > >>> I think it's perfectly ok to perform IO ops in netty threads, > (…) >>> Removing synchronization *did solve* the problem for me, because it >>> allows flink to leverage the whole netty event loop pool and it's ok to >>> have a single thread blocked for a little while (we still can accept >>> connections with other threads). > > > It’s discouraged pattern, as Netty have a thread pool for processing multiple > channels, but a single channel is always handled by the same pre-defined > thread (to the best of my knowledge). In Flink we are lucky that Netty > threads are not doing anything critical besides registering partitions > (heartbeats are handled independently) that could fail the job if blocked. > And I guess you are right, if some threads are blocked on the IO, new > (sub)partition registration should be handled by the non blocked threads, if > not for the global lock. > > It sounds very hacky though. Also that's ignoring performance implication - > one thread blocked on the disks IO, wastes CPU/network potential of ~1/16 > channels (due to this fix pre determined assignment between channels <-> > threads). In some scenarios that might be acceptable, with uniform tasks > without data skew. But if there are simultaneously running multiple tasks > with different work load patterns and/or a data skew, this can cause visible > performance issues. > > Having said that, any rework to fully address this issue and make the IO non > blocking, could be very substantial, so I would be more than happy to just > kick the can down the road for now ;) > >>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not >>> help, I've already tried that (there are still other problematic code >>> paths, eg. releasePartition). > > > Are there other problematic parts besides releasePartition that you have > already analysed? Maybe it would be better to just try moving out those calls > out of the `ResultPartitionManager` somehow call stack? > >>> Let me think about how to get a relevant cpu graph from the TM, it's kind >>> of hard to target a "defective node". > > Thanks, I know it’s non trivial, but I would guess you do not have to target > a “defective node”. If defective node is blocking for ~2 minutes during the > failure, I’m pretty sure other nodes are being blocked constantly for seconds > at a time, and profiler results from such nodes would allow us to confirm the > issue and better understand what’s exactly happening. > >>> Anyway attached are some graphs from such a busy node in time of failure. > > I didn’t get/see any graphs? > >>> Is there any special reason for the synchronization I don't see? I have a >>> feeling it's only for sychronizing `registredPartitions` map access and >>> that it's perfectly ok not to synchronize `createSubpartitionView` and >>> `releasePartition` calls. > > I’m not sure. Definitely removing this lock increases concurrency and so the > potential for race conditions, especially on the releasing resources paths. > After briefly looking at the code, I didn’t find any obvious issue, but there > are some callback/notifications happening, and generally speaking resource > releasing paths are pretty hard to reason about. > > Zhijiang might spot something, as he had a good eye for catching such > problems in the past. > > Besides that, you could just run couple (10? 20?) travis runs. All of the > ITCases from various modules (connectors, flink-tests, …) are pretty good in > catching race conditions in the network stack. > > Piotrek > >> On 29 Jan 2020, at 17:05, David Morávek <david.mora...@gmail.com> wrote: >> >> Just to clarify, these are bare metal nodes (128G ram, 16 cpus + >> hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase. >> >> D. >> >> On Wed, Jan 29, 2020 at 5:03 PM David Morávek <david.mora...@gmail.com> >> wrote: >> >>> Hi Piotr, >>> >>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not >>> help, I've already tried that (there are still other problematic code >>> paths, eg. releasePartition). I think it's perfectly ok to perform IO ops >>> in netty threads, we just have to make sure, we can leverage multiple >>> threads at once. Synchronization in ResultPartitionManager effectively >>> decreases parallelism to one, and "netty tasks / unprocessed messages" keep >>> piling up. >>> >>> Removing synchronization *did solve* the problem for me, because it >>> allows flink to leverage the whole netty event loop pool and it's ok to >>> have a single thread blocked for a little while (we still can accept >>> connections with other threads). >>> >>> Let me think about how to get a relevant cpu graph from the TM, it's kind >>> of hard to target a "defective node". Anyway attached are some graphs from >>> such a busy node in time of failure. >>> >>> Is there any special reason for the synchronization I don't see? I have a >>> feeling it's only for sychronizing `registredPartitions` map access and >>> that it's perfectly ok not to synchronize `createSubpartitionView` and >>> `releasePartition` calls. >>> >>> Thanks, >>> D. >>> >>> On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Hi David, >>>> >>>>> with high load and io waits >>>> >>>> How high values are talking about? >>>> >>>> Could you attach a CPU profiler and post the results somehow? Which >>>> threads are busy on what call trees? >>>> >>>> >>>> >>>> >>>> Regarding the idea of removing of the locks in the >>>> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t >>>> expect it to solve the problem fully. >>>> >>>> Doing a blocking IO in the Netty threads is already asking for troubles, >>>> as even without locks it can block and crash many things (like >>>> createSubpartitionView). But one improvement that we might take a look, is >>>> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking >>>> calls on pre-fetching the buffer. If not for that, code inside >>>> `ResultPartitionManager` would probably be non blocking, or at least much >>>> less blocking. >>>> >>>> Piotrek >>>> >>>>> On 29 Jan 2020, at 14:05, Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>> /CC Piotr and Zhijiang >>>>> >>>>> Sounds reasonable at first glance. Would like to hear Piotr's and >>>> Zhijiang's take, though, they know that code better than me. >>>>> >>>>> On Wed, Jan 29, 2020 at 1:58 PM David Morávek <david.mora...@gmail.com >>>> <mailto:david.mora...@gmail.com>> wrote: >>>>> Hi Stephan, >>>>> >>>>> I've actually managed to narrow problem down to blocked netty server >>>>> threads. I'm using 1.9.1 with few custom patches >>>>> <https://github.com/dmvk/flink/commits/1.9.1-szn < >>>> https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant >>>> to >>>>> this issue. >>>>> >>>>> To highlight the problem, I've added these checks to >>>> ResultPartitionManager >>>>> <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d < >>>> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which >>>>> measure how long "netty tasks" take to execute (monitor acquisition + >>>>> actual execution). >>>>> >>>>> We indeed have a pretty busy cluster, with high load and io waits >>>> (mostly >>>>> due to ongoing shuffles and computations). From the measurements I can >>>> see >>>>> numbers like: >>>>> >>>>> createSubpartitionView: 129255ms >>>>> createSubpartitionView: 129333ms >>>>> createSubpartitionView: 129353ms >>>>> createSubpartitionView: 129354ms >>>>> createSubpartitionView: 144419ms >>>>> createSubpartitionView: 144653ms >>>>> createSubpartitionView: 144759ms >>>>> createSubpartitionView: 144905ms >>>>> releasePartition: 145218ms >>>>> releasePartition: 145250ms >>>>> releasePartition: 145256ms >>>>> releasePartition: 145263ms >>>>> >>>>> These vary a lot, depending on what other pipelines are being >>>>> simultaneously executed. These numbers imply that at least for 145 >>>> seconds >>>>> (which is greater than conn. timeout), taskmanger was not able to accept >>>>> any connection (because of netty internals >>>>> <https://github.com/netty/netty/issues/240 < >>>> https://github.com/netty/netty/issues/240>>). >>>>> >>>>> Switching to *file* backed BoundedData implementation didn't help, >>>> because >>>>> there are still heavy IO ops being executed by netty threads when >>>> monitor >>>>> is acquired (eg. deletion of backing file). >>>>> >>>>> 1) I've tried to make more fine-grained locking (locking as single >>>>> partition insead of partitionManager). This helped a little, but >>>> pipeline >>>>> is still unable to finish in some cases. >>>>> >>>>> 2) Currently I'm trying to completely remove locking from >>>>> ResultPartitionManager, as it is seems only relevant for internal >>>>> datastructure access (can be replaced with java.concurrent). I think >>>> this >>>>> should also have an impact to overall job completition times in some >>>> cases. >>>>> >>>>> What do you think? >>>>> >>>>> On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <se...@apache.org <mailto: >>>> se...@apache.org>> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> Concerning JAR files: I think this has nothing to do with it, it is a >>>> batch >>>>>> shuffle after all. The previous stage must have completed already. >>>>>> >>>>>> A few things that come to my mind: >>>>>> - What Flink version are you using? 1.9? >>>>>> - Are you sure that the source TaskManager is still running? Earlier >>>>>> Flink versions had an issue with releasing TMs too early, sometimes >>>> before >>>>>> the result was fetched by a consumer. >>>>>> - The buffer creation on the sender / netty server side is more >>>> expensive >>>>>> than necessary, but should be nowhere near as expensive to cause a >>>> stall. >>>>>> >>>>>> Can you elaborate on what the shared lock is that all server threads >>>> are >>>>>> using? >>>>>> >>>>>> Best, >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com >>>> <mailto:pi...@ververica.com>> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>>> In case of large jar, wouldn't this happen in previous stages as >>>> well >>>>>> (if >>>>>>>> so this should not be the case)? >>>>>>> >>>>>>> I’m not exactly sure how jars are distributed, but if they are being >>>>>>> sent/uploaded from one (or some other static/fixed number, like >>>> uploading >>>>>>> to and reading from a DFS) node to all, this might not scale well. >>>> Also >>>>>>> your dev deployment might not be stressing >>>> network/storage/something as >>>>>>> much as production deployment, which can also affect time to deploy >>>> the >>>>>> job. >>>>>>> >>>>>>> What’s yours job size? (How large is the jar uploaded to Flink?) >>>>>>> >>>>>>> Also there might be other factors in play here, like if you are >>>> using >>>>>>> Flink job mode (not stand alone), time to start up a Flink node >>>> might be >>>>>>> too long. Some nodes are already up and running and they are time >>>> outing >>>>>>> waiting for others to start up. >>>>>>> >>>>>>>> Also there shouldn't be any state involved >>>>>>>> (unless Beam IO's use it internally). >>>>>>> >>>>>>> My bad. Instead of >>>>>>> >>>>>>>> - data size per single job run ranging from 100G to 1TB >>>>>>> >>>>>>> I read state size 100G to 1TB. >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski < >>>> pi...@ververica.com <mailto:pi...@ververica.com>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi David, >>>>>>>>> >>>>>>>>> The usual cause for connection time out is long deployment. For >>>>>> example >>>>>>> if >>>>>>>>> your Job's jar is large and takes long time to distribute across >>>> the >>>>>>>>> cluster. I’m not sure if large state could affect this as well >>>> or not. >>>>>>> Are >>>>>>>>> you sure that’s not the case? >>>>>>>>> >>>>>>>>> The think you are suggesting, I haven’t heard about previously, >>>> but >>>>>>> indeed >>>>>>>>> theoretically it could happen. Reading from mmap’ed sub >>>> partitions >>>>>> could >>>>>>>>> block the Netty threads if kernel decides to drop mmap’ed page >>>> and it >>>>>>> has >>>>>>>>> to be read from the disks. Could you check your CPU and disks IO >>>>>> usage? >>>>>>>>> This should be visible by high IOWait CPU usage. Could you for >>>> example >>>>>>> post >>>>>>>>> couple of sample results of >>>>>>>>> >>>>>>>>> iostat -xm 2 >>>>>>>>> >>>>>>>>> command from some representative Task Manager? If indeed disks >>>> are >>>>>>>>> overloaded, changing Flink’s config option >>>>>>>>> >>>>>>>>> taskmanager.network.bounded-blocking-subpartition-type >>>>>>>>> >>>>>>>>> From default to `file` could solve the problem. FYI, this option >>>> is >>>>>>>>> renamed in 1.10 to >>>>>>>>> >>>>>>>>> taskmanager.network.blocking-shuffle.type >>>>>>>>> >>>>>>>>> And it’s default value will be `file`. >>>>>>>>> >>>>>>>>> We would appreciate if you could get back to us with the results! >>>>>>>>> >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org >>>> <mailto:trohrm...@apache.org>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi David, >>>>>>>>>> >>>>>>>>>> I'm unfortunately not familiar with these parts of Flink but I'm >>>>>>> pulling >>>>>>>>>> Piotr in who might be able to tell you more. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org >>>> <mailto:d...@apache.org>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello community, >>>>>>>>>>> >>>>>>>>>>> I'm currently struggling with an Apache Beam batch pipeline on >>>> top >>>>>> of >>>>>>>>>>> Flink. The pipeline runs smoothly in smaller environments, but >>>> in >>>>>>>>>>> production it always ends up with `connection timeout` in one >>>> of the >>>>>>>>> last >>>>>>>>>>> shuffle phases. >>>>>>>>>>> >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> .network.partition.consumer.PartitionConnectionException: >>>>>>>>>>> Connection for partition >>>>>>>>>>> >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f >>>>>> not >>>>>>>>>>> reachable. >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) >>>>>>>>>>> at org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) >>>>>>>>>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) >>>>>>>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> ... >>>>>>>>>>> Caused by: >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: >>>>>>>>>>> Connection timed out: ##########/10.249.28.39:25709 < >>>> http://10.249.28.39:25709/> >>>>>>>>>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native >>>> Method) >>>>>>>>>>> at >>>>>>>>>>> >>>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >>>>>>>>>>> at ... >>>>>>>>>>> >>>>>>>>>>> Basically the pipeline looks as follows: >>>>>>>>>>> >>>>>>>>>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy >>>>>>>>> computation >>>>>>>>>>> - few secs per element) -> write to multiple outputs (~4) >>>>>>>>>>> >>>>>>>>>>> - cluster size: 100 tms >>>>>>>>>>> - slots per tm: 4 >>>>>>>>>>> - data size per single job run ranging from 100G to 1TB >>>>>>>>>>> - job paralelism: 400 >>>>>>>>>>> >>>>>>>>>>> I've tried to increase netty >>>>>>>>>>> `taskmanager.network.netty.client.connectTimeoutSec` with no >>>> luck. >>>>>>> Also >>>>>>>>>>> increasing # of netty threads did not help. JVM performs ok (no >>>>>> ooms, >>>>>>> gc >>>>>>>>>>> pauses, ...). Connect backlog defaults to 128. >>>>>>>>>>> >>>>>>>>>>> This is probably caused by netty threads being blocked on the >>>> server >>>>>>>>> side. >>>>>>>>>>> All these threads share the same lock, so increasing number of >>>>>> threads >>>>>>>>>>> won't help. >>>>>>>>>>> >>>>>>>>>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 >>>>>>>>>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] >>>>>>>>>>> java.lang.Thread.State: RUNNABLE >>>>>>>>>>> at java.lang.Number.<init>(Number.java:55) >>>>>>>>>>> at java.lang.Long.<init>(Long.java:947) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) >>>>>>>>>>> at java.lang.reflect.Field.get(Field.java:393) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) >>>>>>>>>>> - locked <0x00000006d822e180> (a java.lang.Object) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) >>>>>>>>>>> - locked <0x00000006cad32578> (a java.util.HashMap) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) >>>>>>>>>>> - locked <0x000000079767ff38> (a java.lang.Object) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.io < >>>> http://org.apache.flink.runtime.io/> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) >>>>>>>>>>> at >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) >>>>>>>>>>> >>>>>>>>>>> This may be related to mmap backed BoundedData implementation, >>>> where >>>>>>>>>>> `nextBuffer` seems to be somehow expensive (reflection, >>>> skipping >>>>>> empty >>>>>>>>>>> buffers?) . Just to note, last phases only shuffle metadata >>>>>> (kilobyte >>>>>>>>>>> scale), but the job paralelism remains the same due to beam >>>> nature >>>>>>>>> (400). >>>>>>>>>>> >>>>>>>>>>> Does this sound familiar to anyone? Do you have any >>>> suggestions how >>>>>> to >>>>>>>>>>> solve this? >>>>>>>>>>> >>>>>>>>>>> Thanks for help, >>>>>>>>>>> David >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >>>> >