Hi! Concerning JAR files: I think this has nothing to do with it, it is a batch shuffle after all. The previous stage must have completed already.
A few things that come to my mind: - What Flink version are you using? 1.9? - Are you sure that the source TaskManager is still running? Earlier Flink versions had an issue with releasing TMs too early, sometimes before the result was fetched by a consumer. - The buffer creation on the sender / netty server side is more expensive than necessary, but should be nowhere near as expensive to cause a stall. Can you elaborate on what the shared lock is that all server threads are using? Best, Stephan On Tue, Jan 28, 2020 at 4:23 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > > In case of large jar, wouldn't this happen in previous stages as well (if > > so this should not be the case)? > > I’m not exactly sure how jars are distributed, but if they are being > sent/uploaded from one (or some other static/fixed number, like uploading > to and reading from a DFS) node to all, this might not scale well. Also > your dev deployment might not be stressing network/storage/something as > much as production deployment, which can also affect time to deploy the job. > > What’s yours job size? (How large is the jar uploaded to Flink?) > > Also there might be other factors in play here, like if you are using > Flink job mode (not stand alone), time to start up a Flink node might be > too long. Some nodes are already up and running and they are time outing > waiting for others to start up. > > > Also there shouldn't be any state involved > > (unless Beam IO's use it internally). > > My bad. Instead of > > > - data size per single job run ranging from 100G to 1TB > > I read state size 100G to 1TB. > > Piotrek > > > > > On Tue, Jan 28, 2020 at 11:45 AM Piotr Nowojski <pi...@ververica.com> > wrote: > > > >> Hi David, > >> > >> The usual cause for connection time out is long deployment. For example > if > >> your Job's jar is large and takes long time to distribute across the > >> cluster. I’m not sure if large state could affect this as well or not. > Are > >> you sure that’s not the case? > >> > >> The think you are suggesting, I haven’t heard about previously, but > indeed > >> theoretically it could happen. Reading from mmap’ed sub partitions could > >> block the Netty threads if kernel decides to drop mmap’ed page and it > has > >> to be read from the disks. Could you check your CPU and disks IO usage? > >> This should be visible by high IOWait CPU usage. Could you for example > post > >> couple of sample results of > >> > >> iostat -xm 2 > >> > >> command from some representative Task Manager? If indeed disks are > >> overloaded, changing Flink’s config option > >> > >> taskmanager.network.bounded-blocking-subpartition-type > >> > >> From default to `file` could solve the problem. FYI, this option is > >> renamed in 1.10 to > >> > >> taskmanager.network.blocking-shuffle.type > >> > >> And it’s default value will be `file`. > >> > >> We would appreciate if you could get back to us with the results! > >> > >> Piotrek > >> > >>> On 28 Jan 2020, at 11:03, Till Rohrmann <trohrm...@apache.org> wrote: > >>> > >>> Hi David, > >>> > >>> I'm unfortunately not familiar with these parts of Flink but I'm > pulling > >>> Piotr in who might be able to tell you more. > >>> > >>> Cheers, > >>> Till > >>> > >>> On Mon, Jan 27, 2020 at 5:58 PM David Morávek <d...@apache.org> wrote: > >>> > >>>> Hello community, > >>>> > >>>> I'm currently struggling with an Apache Beam batch pipeline on top of > >>>> Flink. The pipeline runs smoothly in smaller environments, but in > >>>> production it always ends up with `connection timeout` in one of the > >> last > >>>> shuffle phases. > >>>> > >>>> org.apache.flink.runtime.io > >>>> .network.partition.consumer.PartitionConnectionException: > >>>> Connection for partition > >>>> 260ddc26547df92babd1c6d430903b9d@da5da68d6d4600bb272d68172a09760f not > >>>> reachable. > >>>> at org.apache.flink.runtime.io > >>>> > >> > .network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) > >>>> at org.apache.flink.runtime.io > >>>> > >> > .network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237) > >>>> at org.apache.flink.runtime.io > >>>> > >> > .network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215) > >>>> at > >>>> > >> > org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65) > >>>> at > >>>> > >> > org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866) > >>>> at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) > >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > >>>> at java.lang.Thread.run(Thread.java:748) > >>>> ... > >>>> Caused by: > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: > >>>> Connection timed out: ##########/10.249.28.39:25709 > >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > >>>> at > >>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > >>>> at ... > >>>> > >>>> Basically the pipeline looks as follows: > >>>> > >>>> read "skewed" sources -> reshuffle -> flatMap (performs a heavy > >> computation > >>>> - few secs per element) -> write to multiple outputs (~4) > >>>> > >>>> - cluster size: 100 tms > >>>> - slots per tm: 4 > >>>> - data size per single job run ranging from 100G to 1TB > >>>> - job paralelism: 400 > >>>> > >>>> I've tried to increase netty > >>>> `taskmanager.network.netty.client.connectTimeoutSec` with no luck. > Also > >>>> increasing # of netty threads did not help. JVM performs ok (no ooms, > gc > >>>> pauses, ...). Connect backlog defaults to 128. > >>>> > >>>> This is probably caused by netty threads being blocked on the server > >> side. > >>>> All these threads share the same lock, so increasing number of threads > >>>> won't help. > >>>> > >>>> "Flink Netty Server (0) Thread 2" #131 daemon prio=5 os_prio=0 > >>>> tid=0x00007f339818e800 nid=0x7e9 runnable [0x00007f334acf5000] > >>>> java.lang.Thread.State: RUNNABLE > >>>> at java.lang.Number.<init>(Number.java:55) > >>>> at java.lang.Long.<init>(Long.java:947) > >>>> at > >>>> > >>>> > >> > sun.reflect.UnsafeLongFieldAccessorImpl.get(UnsafeLongFieldAccessorImpl.java:36) > >>>> at java.lang.reflect.Field.get(Field.java:393) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.HybridMemorySegment.getAddress(HybridMemorySegment.java:420) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.HybridMemorySegment.checkBufferAndGetAddress(HybridMemorySegment.java:434) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:81) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.HybridMemorySegment.<init>(HybridMemorySegment.java:66) > >>>> at > >>>> > >>>> > >> > org.apache.flink.core.memory.MemorySegmentFactory.wrapOffHeapMemory(MemorySegmentFactory.java:130) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.BufferReaderWriterUtil.sliceNextBuffer(BufferReaderWriterUtil.java:87) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.MemoryMappedBoundedData$BufferSlicer.nextBuffer(MemoryMappedBoundedData.java:240) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.BoundedBlockingSubpartitionReader.<init>(BoundedBlockingSubpartitionReader.java:71) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:201) > >>>> - locked <0x00000006d822e180> (a java.lang.Object) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:279) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:72) > >>>> - locked <0x00000006cad32578> (a java.util.HashMap) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.netty.CreditBasedSequenceNumberingViewReader.requestSubpartitionView(CreditBasedSequenceNumberingViewReader.java:86) > >>>> - locked <0x000000079767ff38> (a java.lang.Object) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:102) > >>>> at > >>>> org.apache.flink.runtime.io > >>>> > >> > .network.netty.PartitionRequestServerHandler.channelRead0(PartitionRequestServerHandler.java:42) > >>>> at > >>>> > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > >>>> at > >>>> > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > >>>> at > >>>> > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) > >>>> at > >>>> > >>>> > >> > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) > >>>> > >>>> This may be related to mmap backed BoundedData implementation, where > >>>> `nextBuffer` seems to be somehow expensive (reflection, skipping empty > >>>> buffers?) . Just to note, last phases only shuffle metadata (kilobyte > >>>> scale), but the job paralelism remains the same due to beam nature > >> (400). > >>>> > >>>> Does this sound familiar to anyone? Do you have any suggestions how to > >>>> solve this? > >>>> > >>>> Thanks for help, > >>>> David > >>>> > >> > >> > >