One more thing. Could you create a JIRA ticket for this issue? We could also 
move the discussion there.

Piotrek

> On 30 Jan 2020, at 12:14, Piotr Nowojski <pnowoj...@apache.org> wrote:
> 
> Hi,
> 
>>> I think it's perfectly ok to perform IO ops in netty threads,
> (…)
>>> Removing synchronization *did solve* the problem for me, because it
>>> allows flink to leverage the whole netty event loop pool and it's ok to
>>> have a single thread blocked for a little while (we still can accept
>>> connections with other threads).
> 
> 
> It’s discouraged pattern, as Netty have a thread pool for processing multiple 
> channels, but a single channel is always handled by the same pre-defined 
> thread (to the best of my knowledge). In Flink we are lucky that Netty 
> threads are not doing anything critical besides registering partitions 
> (heartbeats are handled independently) that could fail the job if blocked. 
> And I guess you are right, if some threads are blocked on the IO, new 
> (sub)partition registration should be handled by the non blocked threads, if 
> not for the global lock. 
> 
> It sounds very hacky though. Also that's ignoring performance implication - 
> one thread blocked on the disks IO, wastes CPU/network potential of ~1/16 
> channels (due to this fix pre determined assignment between channels <-> 
> threads). In some scenarios that might be acceptable, with uniform tasks 
> without data skew. But if there are simultaneously running multiple tasks 
> with different work load patterns and/or a data skew, this can cause visible 
> performance issues.
> 
> Having said that, any rework to fully address this issue and make the IO non 
> blocking, could be very substantial, so I would be more than happy to just 
> kick the can down the road for now ;) 
> 
>>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not
>>> help, I've already tried that (there are still other problematic code
>>> paths, eg. releasePartition). 
> 
> 
> Are there other problematic parts besides releasePartition that you have 
> already analysed? Maybe it would be better to just try moving out those calls 
> out of the `ResultPartitionManager` somehow call stack?
> 
>>> Let me think about how to get a relevant cpu graph from the TM, it's kind
>>> of hard to target a "defective node". 
> 
> Thanks, I know it’s non trivial, but I would guess you do not have to target 
> a “defective node”. If defective node is blocking for ~2 minutes during the 
> failure, I’m pretty sure other nodes are being blocked constantly for seconds 
> at a time, and profiler results from such nodes would allow us to confirm the 
> issue and better understand what’s exactly happening.
> 
>>> Anyway attached are some graphs from such a busy node in time of failure.
> 
> I didn’t get/see any graphs?
> 
>>> Is there any special reason for the synchronization I don't see? I have a
>>> feeling it's only for sychronizing `registredPartitions` map access and
>>> that it's perfectly ok not to synchronize `createSubpartitionView` and
>>> `releasePartition` calls.
> 
> I’m not sure. Definitely removing this lock increases concurrency and so the 
> potential for race conditions, especially on the releasing resources paths. 
> After briefly looking at the code, I didn’t find any obvious issue, but there 
> are some callback/notifications happening, and generally speaking resource 
> releasing paths are pretty hard to reason about.
> 
> Zhijiang might spot something, as he had a good eye for catching such 
> problems in the past.
> 
> Besides that, you could just run couple (10? 20?) travis runs. All of the 
> ITCases from various modules (connectors, flink-tests, …) are pretty good in 
> catching race conditions in the network stack.
> 
> Piotrek
> 
>> On 29 Jan 2020, at 17:05, David Morávek <david.mora...@gmail.com> wrote:
>> 
>> Just to clarify, these are bare metal nodes (128G ram, 16 cpus +
>> hyperthreading, 4xHDDS, 10g network), which run yarn, hdfs and hbase.
>> 
>> D.
>> 
>> On Wed, Jan 29, 2020 at 5:03 PM David Morávek <david.mora...@gmail.com>
>> wrote:
>> 
>>> Hi Piotr,
>>> 
>>> removal of buffer prefetch in BoundedBlockingSubpartitionReader did not
>>> help, I've already tried that (there are still other problematic code
>>> paths, eg. releasePartition). I think it's perfectly ok to perform IO ops
>>> in netty threads, we just have to make sure, we can leverage multiple
>>> threads at once. Synchronization in ResultPartitionManager effectively
>>> decreases parallelism to one, and "netty tasks / unprocessed messages" keep
>>> piling up.
>>> 
>>> Removing synchronization *did solve* the problem for me, because it
>>> allows flink to leverage the whole netty event loop pool and it's ok to
>>> have a single thread blocked for a little while (we still can accept
>>> connections with other threads).
>>> 
>>> Let me think about how to get a relevant cpu graph from the TM, it's kind
>>> of hard to target a "defective node". Anyway attached are some graphs from
>>> such a busy node in time of failure.
>>> 
>>> Is there any special reason for the synchronization I don't see? I have a
>>> feeling it's only for sychronizing `registredPartitions` map access and
>>> that it's perfectly ok not to synchronize `createSubpartitionView` and
>>> `releasePartition` calls.
>>> 
>>> Thanks,
>>> D.
>>> 
>>> On Wed, Jan 29, 2020 at 4:45 PM Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>> 
>>>> Hi David,
>>>> 
>>>>> with high load and io waits
>>>> 
>>>> How high values are talking about?
>>>> 
>>>> Could you attach a CPU profiler and post the results somehow? Which
>>>> threads are busy on what call trees?
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Regarding the idea of removing of the locks in the
>>>> `ResultPartitionManager`. I guess it could help a bit, but I wouldn’t
>>>> expect it to solve the problem fully.
>>>> 
>>>> Doing a blocking IO in the Netty threads is already asking for troubles,
>>>> as even without locks it can block and crash many things (like
>>>> createSubpartitionView). But one improvement that we might take a look, is
>>>> that `BoundedBlockingSubpartitionReader` constructor is doing a blocking
>>>> calls on pre-fetching the buffer. If not for that, code inside
>>>> `ResultPartitionManager` would probably be non blocking, or at least much
>>>> less blocking.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 29 Jan 2020, at 14:05, Stephan Ewen <se...@apache.org> wrote:
>>>>> 
>>>>> /CC Piotr and Zhijiang
>>>>> 
>>>>> Sounds reasonable at first glance. Would like to hear Piotr's and
>>>> Zhijiang's take, though, they know that code better than me.
>>>>> 
>>>>> On Wed, Jan 29, 2020 at 1:58 PM David Morávek <david.mora...@gmail.com
>>>> <mailto:david.mora...@gmail.com>> wrote:
>>>>> Hi Stephan,
>>>>> 
>>>>> I've actually managed to narrow problem down to blocked netty server
>>>>> threads. I'm using 1.9.1 with few custom patches
>>>>> <https://github.com/dmvk/flink/commits/1.9.1-szn <
>>>> https://github.com/dmvk/flink/commits/1.9.1-szn>>, that are not relevant
>>>> to
>>>>> this issue.
>>>>> 
>>>>> To highlight the problem, I've added these checks to
>>>> ResultPartitionManager
>>>>> <https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d <
>>>> https://gist.github.com/dmvk/8be27e8574777222ebb7243dfceae67d>>, which
>>>>> measure how long "netty tasks" take to execute (monitor acquisition +
>>>>> actual execution).
>>>>> 
>>>>> We indeed have a pretty busy cluster, with high load and io waits
>>>> (mostly
>>>>> due to ongoing shuffles and computations). From the measurements I can
>>>> see
>>>>> numbers like:
>>>>> 
>>>>> createSubpartitionView: 129255ms
>>>>> createSubpartitionView: 129333ms
>>>>> createSubpartitionView: 129353ms
>>>>> createSubpartitionView: 129354ms
>>>>> createSubpartitionView: 144419ms
>>>>> createSubpartitionView: 144653ms
>>>>> createSubpartitionView: 144759ms
>>>>> createSubpartitionView: 144905ms
>>>>> releasePartition: 145218ms
>>>>> releasePartition: 145250ms
>>>>> releasePartition: 145256ms
>>>>> releasePartition: 145263ms
>>>>> 
>>>>> These vary a lot, depending on what other pipelines are being
>>>>> simultaneously executed. These numbers imply that at least for 145
>>>> seconds
>>>>> (which is greater than conn. timeout), taskmanger was not able to accept
>>>>> any connection (because of netty internals
>>>>> <https://github.com/netty/netty/issues/240 <
>>>> https://github.com/netty/netty/issues/240>>).
>>>>> 
>>>>> Switching to *file* backed BoundedData implementation didn't help,
>>>> because
>>>>> there are still heavy IO ops being executed by netty threads when
>>>> monitor
>>>>> is acquired (eg. deletion of backing file).
>>>>> 
>>>>> 1) I've tried to make more fine-grained locking (locking as single
>>>>> partition insead of partitionManager). This helped a little, but
>>>> pipeline
>>>>> is still unable to finish in some cases.
>>>>> 
>>>>> 2) Currently I'm trying to completely remove locking from
>>>>> ResultPartitionManager, as it is seems only relevant for internal
>>>>> datastructure access (can be replaced with java.concurrent). I think
>>>> this
>>>>> should also have an impact to overall job completition times in some
>>>> cases.
>>>>> 
>>>>> What do you think?
>>>>> 
>>>>> On Tue, Jan 28, 2020 at 9:53 PM Stephan Ewen <se...@apache.org <mailto:
>>>> se...@apache.org>> wrote:
>>>>> 
>>>>>> Hi!
>>>>>> 
>>>>>> Concerning JAR files: I think this has nothing to do with it, it is a
>>>> batch
>>>>>> shuffle after all. The previous stage must have completed already.
>>>>>> 
>>>>>> A few things that come to my mind:
>>>>>> - What Flink version are you using? 1.9?
>>>>>> - Are you sure that the source TaskManager is still running? Earlier
>>>>>> Flink versions had an issue with releasing TMs too early, sometimes
>>>> before
>>>>>> the result was fetched by a consumer.
>>>>>> - The buffer creation on the sender / netty server side is more
>>>> expensive
>>>>>> than necessary, but should be nowhere near as expensive to cause a
>>>> stall.
>>>>>> 
>>>>>> Can you elaborate on what the shared lock is that all server threads
>>>> are
>>>>>> using?
>>>>>> 
>>>>>> Best,
>>>>>> Stephan
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com
>>>> <mailto:pi...@ververica.com>>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>>> In case of large jar, wouldn't this happen in previous stages as
>>>> well
>>>>>> (if
>>>>>>>> so this should not be the case)?
>>>>>>> 
>>>>>>> I’m not exactly sure how jars are distributed, but if they are being
>>>>>>> sent/uploaded from one (or some other static/fixed number, like
>>>> uploading
>>>>>>> to and reading from a DFS) node to all, this might not scale well.
>>>> Also
>>>>>>> your dev deployment might not be stressing
>>>> network/storage/something as
>>>>>>> much as production deployment, which can also affect time to deploy
>>>> the
>>>>>> job.
>>>>>>> 
>>>>>>> What’s yours job size? (How large is the jar uploaded to Flink?)
>>>>>>> 
>>>>>>> Also there might be other factors in play here, like if you are
>>>> using
>>>>>>> Flink job mode (not stand alone), time to start up a Flink node
>>>> might be
>>>>>>> too long. Some nodes are already up and running and they are time
>>>> outing
>>>>>>> waiting for others to start up.
>>>>>>> 
>>>>>>>> Also there shouldn't be any state involved
>>>>>>>> (unless Beam IO's use it internally).
>>>>>>> 
>>>>>>> My bad. Instead of
>>>>>>> 
>>>>>>>> - data size per single job run ranging from 100G to 1TB
>>>>>>> 
>>>>>>> I read state size 100G to 1TB.
>>>>>>> 
>>>>>>> Piotrek
>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <
>>>> pi...@ververica.com <mailto:pi...@ververica.com>>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi David,
>>>>>>>>> 
>>>>>>>>> The usual cause for connection time out is long deployment. For
>>>>>> example
>>>>>>> if
>>>>>>>>> your Job's jar is large and takes long time to distribute across
>>>> the
>>>>>>>>> cluster. I’m not sure if large state could affect this as well
>>>> or not.
>>>>>>> Are
>>>>>>>>> you sure that’s not the case?
>>>>>>>>> 
>>>>>>>>> The think you are suggesting, I haven’t heard about previously,
>>>> but
>>>>>>> indeed
>>>>>>>>> theoretically it could happen. Reading from mmap’ed sub
>>>> partitions
>>>>>> could
>>>>>>>>> block the Netty threads if kernel decides to drop mmap’ed page
>>>> and it
>>>>>>> has
>>>>>>>>> to be read from the disks. Could you check your CPU and disks IO
>>>>>> usage?
>>>>>>>>> This should be visible by high IOWait CPU usage. Could you for
>>>> example
>>>>>>> post
>>>>>>>>> couple of sample results of
>>>>>>>>> 
>>>>>>>>>      iostat -xm 2
>>>>>>>>> 
>>>>>>>>> command from some representative Task Manager? If indeed disks
>>>> are
>>>>>>>>> overloaded, changing Flink’s config option
>>>>>>>>> 
>>>>>>>>>      taskmanager.network.bounded-blocking-subpartition-type
>>>>>>>>> 
>>>>>>>>> From default to `file` could solve the problem. FYI, this option
>>>> is
>>>>>>>>> renamed in 1.10 to
>>>>>>>>> 
>>>>>>>>>      taskmanager.network.blocking-shuffle.type
>>>>>>>>> 
>>>>>>>>> And it’s default value will be `file`.
>>>>>>>>> 
>>>>>>>>> We would appreciate if you could get back to us with the results!
>>>>>>>>> 
>>>>>>>>> Piotrek
>>>>>>>>> 
>>>>>>>>>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org
>>>> <mailto:trohrm...@apache.org>>
>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi David,
>>>>>>>>>> 
>>>>>>>>>> I'm unfortunately not familiar with these parts of Flink but I'm
>>>>>>> pulling
>>>>>>>>>> Piotr in who might be able to tell you more.
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>> 
>>>>>>>>>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org
>>>> <mailto:d...@apache.org>>
>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hello community,
>>>>>>>>>>> 
>>>>>>>>>>> I'm currently struggling with an Apache Beam batch pipeline on
>>>> top
>>>>>> of
>>>>>>>>>>> Flink. The pipeline runs smoothly in smaller environments, but
>>>> in
>>>>>>>>>>> production it always ends up with `connection timeout` in one
>>>> of the
>>>>>>>>> last
>>>>>>>>>>> shuffle phases.
>>>>>>>>>>> 
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> .network.partition.consumer.PartitionConnectionException:
>>>>>>>>>>> Connection for partition
>>>>>>>>>>> 
>>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f
>>>>>> not
>>>>>>>>>>> reachable.
>>>>>>>>>>>     at org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>>>>>>>>>>     at org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>>>>>>>>>>     at org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>>>>>>>>>>>     at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>>>>>>>>>>     at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>> ...
>>>>>>>>>>> Caused by:
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>>>>>>>>>> Connection timed out: ##########/10.249.28.39:25709 <
>>>> http://10.249.28.39:25709/>
>>>>>>>>>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native
>>>> Method)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>>>>>>>>>>     at ...
>>>>>>>>>>> 
>>>>>>>>>>> Basically the pipeline looks as follows:
>>>>>>>>>>> 
>>>>>>>>>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy
>>>>>>>>> computation
>>>>>>>>>>> - few secs per element) -> write to multiple outputs (~4)
>>>>>>>>>>> 
>>>>>>>>>>> - cluster size: 100 tms
>>>>>>>>>>> - slots per tm: 4
>>>>>>>>>>> - data size per single job run ranging from 100G to 1TB
>>>>>>>>>>> - job paralelism: 400
>>>>>>>>>>> 
>>>>>>>>>>> I've tried to increase netty
>>>>>>>>>>> `taskmanager.network.netty.client.connectTimeoutSec` with no
>>>> luck.
>>>>>>> Also
>>>>>>>>>>> increasing # of netty threads did not help. JVM performs ok (no
>>>>>> ooms,
>>>>>>> gc
>>>>>>>>>>> pauses, ...). Connect backlog defaults to 128.
>>>>>>>>>>> 
>>>>>>>>>>> This is probably caused by netty threads being blocked on the
>>>> server
>>>>>>>>> side.
>>>>>>>>>>> All these threads share the same lock, so increasing number of
>>>>>> threads
>>>>>>>>>>> won't help.
>>>>>>>>>>> 
>>>>>>>>>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0
>>>>>>>>>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000]
>>>>>>>>>>> java.lang.Thread.State: RUNNABLE
>>>>>>>>>>>     at java.lang.Number.<init>(Number.java:55)
>>>>>>>>>>>     at java.lang.Long.<init>(Long.java:947)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36)
>>>>>>>>>>>     at java.lang.reflect.Field.get(Field.java:393)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201)
>>>>>>>>>>>     - locked <0x00000006d822e180> (a java.lang.Object)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72)
>>>>>>>>>>>     - locked <0x00000006cad32578> (a java.util.HashMap)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86)
>>>>>>>>>>>     - locked <0x000000079767ff38> (a java.lang.Object)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.flink.runtime.io <
>>>> http://org.apache.flink.runtime.io/>
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
>>>>>>>>>>>     at
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
>>>>>>>>>>> 
>>>>>>>>>>> This may be related to mmap backed BoundedData implementation,
>>>> where
>>>>>>>>>>> `nextBuffer` seems to be somehow expensive (reflection,
>>>> skipping
>>>>>> empty
>>>>>>>>>>> buffers?) . Just to note, last phases only shuffle metadata
>>>>>> (kilobyte
>>>>>>>>>>> scale), but the job paralelism remains the same due to beam
>>>> nature
>>>>>>>>> (400).
>>>>>>>>>>> 
>>>>>>>>>>> Does this sound familiar to anyone? Do you have any
>>>> suggestions how
>>>>>> to
>>>>>>>>>>> solve this?
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for help,
>>>>>>>>>>> David
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>>> 
> 

Reply via email to