Thx, Xintong for the detailed explanation of memory fraction. I increased the mem fraction now.
As I increase the defaultParallelism, I keep getting this error: org.apache.flink.runtime.io.network.partition.consumer. PartitionConnectionException: Connection for partition e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not reachable. at org.apache.flink.runtime.io.network.partition.consumer. RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168) at org.apache.flink.runtime.io.network.partition.consumer. SingleInputGate.requestPartitions(SingleInputGate.java:237) at org.apache.flink.runtime.io.network.partition.consumer. SingleInputGate.setup(SingleInputGate.java:215) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup( InputGateWithMetrics.java:65) at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates( Task.java:866) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Connecting the channel failed: Connecting to remote task manager + '/10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty. PartitionRequestClientFactory$ConnectingChannel.waitForChannel( PartitionRequestClientFactory.java:197) at org.apache.flink.runtime.io.network.netty. PartitionRequestClientFactory$ConnectingChannel.access$000( PartitionRequestClientFactory.java:134) at org.apache.flink.runtime.io.network.netty. PartitionRequestClientFactory.createPartitionRequestClient( PartitionRequestClientFactory.java:70) at org.apache.flink.runtime.io.network.netty.NettyConnectionManager .createPartitionRequestClient(NettyConnectionManager.java:68) at org.apache.flink.runtime.io.network.partition.consumer. RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165) ... 7 more Caused by: org.apache.flink.runtime.io.network.netty.exception. RemoteTransportException: Connecting to remote task manager + '/ 10.9.239.218:45544' has failed. This might indicate that the remote task manager has been lost. at org.apache.flink.runtime.io.network.netty. PartitionRequestClientFactory$ConnectingChannel.operationComplete( PartitionRequestClientFactory.java:220) at org.apache.flink.runtime.io.network.netty. PartitionRequestClientFactory$ConnectingChannel.operationComplete( PartitionRequestClientFactory.java:134) at org.apache.flink.shaded.netty4.io.netty.util.concurrent. DefaultPromise.notifyListener0(DefaultPromise.java:511) at org.apache.flink.shaded.netty4.io.netty.util.concurrent. DefaultPromise.notifyListeners0(DefaultPromise.java:504) at org.apache.flink.shaded.netty4.io.netty.util.concurrent. DefaultPromise.notifyListenersNow(DefaultPromise.java:483) at org.apache.flink.shaded.netty4.io.netty.util.concurrent. DefaultPromise.notifyListeners(DefaultPromise.java:424) at org.apache.flink.shaded.netty4.io.netty.util.concurrent. DefaultPromise.tryFailure(DefaultPromise.java:121) at org.apache.flink.shaded.netty4.io.netty.channel.nio. AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise( AbstractNioChannel.java:327) at org.apache.flink.shaded.netty4.io.netty.channel.nio. AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java: 343) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop .processSelectedKey(NioEventLoop.java:644) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop .processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop .processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run( NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent. SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) ... 1 more Caused by: org.apache.flink.shaded.netty4.io.netty.channel. AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9. 239.218:45544 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714 ) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio. NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) at org.apache.flink.shaded.netty4.io.netty.channel.nio. AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java: 340) ... 6 more Caused by: java.net.ConnectException: Connection timed out ... 10 more On Wed, May 27, 2020 at 7:14 PM Xintong Song <tonysong...@gmail.com> wrote: > Ah, I guess I had misunderstood what your mean. > > Below 18000 tasks, the Flink Job is able to start up. >> Even though I increased the number of slots, it still works when 312 >> slots are being used. >> > When you say "it still works", I thought that you increased the > parallelism the job was sill executed as the parallelism was not increased. > From your latest reply, it seems the job's parallelism is indeed > increased, but then it runs into failures. > > The reason you run into the "Insufficient number of network buffers" > exception, is that with more tasks in your job, more inter-task data > transmission channels, thus memory for network buffers, are needed. > > To increase the network memory size, the following configuration options, > as you already found, are related. > > - taskmanager.network.memory.fraction > - taskmanager.network.memory.max > - taskmanager.network.memory.min > > Please be aware that `taskmanager.memory.task.off-heap.size` is not > related to network memory, and is only available in Flink 1.10 and above > while you're using 1.9.1 as suggested by the screenshots. > > The network memory size is calculated as `min(max(some_total_value * > network_fraction, network_min), network_max)`. According to the error > message, your current network memory size is `85922 buffers * 32KB/buffer > = 2685MB`, smaller than your "max" (4gb). That means increasing the "max" > does not help in your case. It is the "fraction" that you need to increase. > > Thank you~ > > Xintong Song > > > > On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi Xintong, >> Looks like the issue is not fully resolved :( Attaching 2 screenshots of >> the memory consumption of 1 of the TaskManagers. >> >> To increase the used up Direct memory off heap,Do I change this: >> taskmanager.memory.task.off-heap.size: 5gb >> >> I had increased the taskmanager.network.memory.max: 24gb >> which seems excessive. >> >> 1 of the errors I saw in the Flink logs: >> >> java.io.IOException: Insufficient number of network buffers: required 1, >> but only 0 available. The total number of network buffers is currently set >> to 85922 of 32768 bytes each. You can increase this number by setting the >> configuration keys 'taskmanager.network.memory.fraction', >> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. >> at >> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281) >> at >> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191) >> >> TIA, >> >> >> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Thanks so much, Xintong for guiding me through this. I looked at the >>> Flink logs to see the errors. >>> I had to change taskmanager.network.memory.max: 4gb >>> and akka.ask.timeout: 240s to increase the number of tasks. >>> Now, I am able to increase the number of Tasks/ aka Task vertices. >>> >>> taskmanager.network.memory.fraction: 0.15 >>> taskmanager.network.memory.max: 4gb >>> taskmanager.network.memory.min: 500mb >>> akka.ask.timeout: 240s >>> >>> On Tue, May 26, 2020 at 8:42 PM Xintong Song <tonysong...@gmail.com> >>> wrote: >>> >>>> Could you also explain how do you set the parallelism when getting this >>>> execution plan? >>>> I'm asking because this json file itself only shows the resulted >>>> execution plan. It is not clear to me what is not working as expected in >>>> your case. E.g., you set the parallelism for an operator to 10 but the >>>> execution plan only shows 5. >>>> >>>> Thank you~ >>>> >>>> Xintong Song >>>> >>>> >>>> >>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <bvija...@gmail.com> >>>> wrote: >>>> >>>>> Hi Xintong, >>>>> Thanks for the excellent clarification for tasks. >>>>> >>>>> I attached a sample screenshot above and din't reflect the slots used >>>>> and the tasks limit I was running into in that pic. >>>>> >>>>> I am attaching my Execution plan here. Please let me know how I can >>>>> increase the nmber of tasks aka parallelism. As increase the parallelism, >>>>> i run into this bottleneck with the tasks. >>>>> >>>>> BTW - The https://flink.apache.org/visualizer/ is a great start to >>>>> see this. >>>>> TIA, >>>>> >>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song <tonysong...@gmail.com> >>>>> wrote: >>>>> >>>>>> Increasing network memory buffers (fraction, min, max) seems to >>>>>>> increase tasks slightly. >>>>>> >>>>>> That's wired. I don't think the number of network memory buffers have >>>>>> anything to do with the task amount. >>>>>> >>>>>> Let me try to clarify a few things. >>>>>> >>>>>> Please be aware that, how many tasks a Flink job has, and how many >>>>>> slots a Flink cluster has, are two different things. >>>>>> - The number of tasks are decided by your job's parallelism and >>>>>> topology. E.g., if your job graph have 3 vertices A, B and C, with >>>>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4) >>>>>> tasks. >>>>>> - The number of slots are decided by number of TMs and slots-per-TM. >>>>>> - For streaming jobs, you have to make sure the number of slots is >>>>>> enough for executing all your tasks. The number of slots needed for >>>>>> executing your job is by default the max parallelism of your job graph >>>>>> vertices. Take the above example, you would need 4 slots, because it's >>>>>> the >>>>>> max among all the vertices' parallelisms (2, 3, 4). >>>>>> >>>>>> In your case, the screenshot shows that you job has 9621 tasks in >>>>>> total (not around 18000, the dark box shows total tasks while the green >>>>>> box >>>>>> shows running tasks), and 600 slots are in use (658 - 58) suggesting that >>>>>> the max parallelism of your job graph vertices is 600. >>>>>> >>>>>> If you want to increase the number of tasks, you should increase your >>>>>> job parallelism. There are several ways to do that. >>>>>> >>>>>> - In your job codes (assuming you are using DataStream API) >>>>>> - Use `StreamExecutionEnvironment#setParallelism()` to set >>>>>> parallelism for all operators. >>>>>> - Use `SingleOutputStreamOperator#setParallelism()` to set >>>>>> parallelism for a specific operator. (Only supported for >>>>>> subclasses of >>>>>> `SingleOutputStreamOperator`.) >>>>>> - When submitting your job, use `-p <parallelism>` as an argument >>>>>> for the `flink run` command, to set parallelism for all operators. >>>>>> - Set `parallelism.default` in your `flink-conf.yaml`, to set a >>>>>> default parallelism for your jobs. This will be used for jobs that >>>>>> have not >>>>>> set parallelism with neither of the above methods. >>>>>> >>>>>> >>>>>> Thank you~ >>>>>> >>>>>> Xintong Song >>>>>> >>>>>> >>>>>> >>>>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan < >>>>>> bvija...@gmail.com> wrote: >>>>>> >>>>>>> Hi Xintong, >>>>>>> Thx for your reply. Increasing network memory buffers (fraction, >>>>>>> min, max) seems to increase tasks slightly. >>>>>>> >>>>>>> Streaming job >>>>>>> Standalone >>>>>>> >>>>>>> Vijay >>>>>>> >>>>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song <tonysong...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Vijay, >>>>>>>> >>>>>>>> I don't think your problem is related to number of opening files. >>>>>>>> The parallelism of your job is decided before actually tries to open >>>>>>>> the >>>>>>>> files. And if the OS limit for opening files is reached, you should >>>>>>>> see a >>>>>>>> job execution failure, instead of a success execution with a lower >>>>>>>> parallelism. >>>>>>>> >>>>>>>> Could you share some more information about your use case? >>>>>>>> >>>>>>>> - What kind of job are your executing? Is it a streaming or >>>>>>>> batch processing job? >>>>>>>> - Which Flink deployment do you use? Standalone? Yarn? >>>>>>>> - It would be helpful if you can share the Flink logs. >>>>>>>> >>>>>>>> >>>>>>>> Thank you~ >>>>>>>> >>>>>>>> Xintong Song >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan < >>>>>>>> bvija...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I have increased the number of slots available but the Job is not >>>>>>>>> using all the slots but runs into this approximate 18000 Tasks limit. >>>>>>>>> Looking into the source code, it seems to be opening file - >>>>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203 >>>>>>>>> So, do I have to tune the ulimit or something similar at the >>>>>>>>> Ubuntu O/S level to increase number of tasks available ? What I am >>>>>>>>> confused >>>>>>>>> about is the ulimit is per machine but the ExecutionGraph is across >>>>>>>>> many >>>>>>>>> machines ? Please pardon my ignorance here. Does number of tasks >>>>>>>>> equate to >>>>>>>>> number of open files. I am using 15 slots per TaskManager on AWS >>>>>>>>> m5.4xlarge >>>>>>>>> which has 16 vCPUs. >>>>>>>>> >>>>>>>>> TIA. >>>>>>>>> >>>>>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan < >>>>>>>>> bvija...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Flink Dashboard UI seems to show tasks having a hard limit for >>>>>>>>>> Tasks column around 18000 on a Ubuntu Linux box. >>>>>>>>>> I kept increasing the number of slots per task manager to 15 and >>>>>>>>>> number of slots increased to 705 but the slots to tasks >>>>>>>>>> stayed at around 18000. Below 18000 tasks, the Flink Job is able >>>>>>>>>> to start up. >>>>>>>>>> Even though I increased the number of slots, it still works when >>>>>>>>>> 312 slots are being used. >>>>>>>>>> >>>>>>>>>> taskmanager.numberOfTaskSlots: 15 >>>>>>>>>> >>>>>>>>>> What knob can I tune to increase the number of Tasks ? >>>>>>>>>> >>>>>>>>>> Pls find attached the Flink Dashboard UI. >>>>>>>>>> >>>>>>>>>> TIA, >>>>>>>>>> >>>>>>>>>>