Thx, Xintong for the detailed explanation of memory fraction. I increased
the mem fraction now.

As I increase the defaultParallelism, I keep getting this error:

org.apache.flink.runtime.io.network.partition.consumer.
PartitionConnectionException: Connection for partition
e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not
reachable.
    at org.apache.flink.runtime.io.network.partition.consumer.
RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
    at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.requestPartitions(SingleInputGate.java:237)
    at org.apache.flink.runtime.io.network.partition.consumer.
SingleInputGate.setup(SingleInputGate.java:215)
    at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(
InputGateWithMetrics.java:65)
    at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(
Task.java:866)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connecting the channel failed: Connecting
to remote task manager + '/10.9.239.218:45544' has failed. This might
indicate that the remote task manager has been lost.
    at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory$ConnectingChannel.waitForChannel(
PartitionRequestClientFactory.java:197)
    at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory$ConnectingChannel.access$000(
PartitionRequestClientFactory.java:134)
    at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory.createPartitionRequestClient(
PartitionRequestClientFactory.java:70)
    at org.apache.flink.runtime.io.network.netty.NettyConnectionManager
.createPartitionRequestClient(NettyConnectionManager.java:68)
    at org.apache.flink.runtime.io.network.partition.consumer.
RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
    ... 7 more
Caused by: org.apache.flink.runtime.io.network.netty.exception.
RemoteTransportException: Connecting to remote task manager + '/
10.9.239.218:45544' has failed. This might indicate that the remote task
manager has been lost.
    at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory$ConnectingChannel.operationComplete(
PartitionRequestClientFactory.java:220)
    at org.apache.flink.runtime.io.network.netty.
PartitionRequestClientFactory$ConnectingChannel.operationComplete(
PartitionRequestClientFactory.java:134)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.notifyListener0(DefaultPromise.java:511)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.notifyListeners0(DefaultPromise.java:504)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.notifyListeners(DefaultPromise.java:424)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
DefaultPromise.tryFailure(DefaultPromise.java:121)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.
AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(
AbstractNioChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.
AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:
343)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKey(NioEventLoop.java:644)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKeysOptimized(NioEventLoop.java:591)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
.processSelectedKeys(NioEventLoop.java:508)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(
NioEventLoop.java:470)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    ... 1 more
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.
239.218:45544
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714
)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.
NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.
AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:
340)
    ... 6 more
Caused by: java.net.ConnectException: Connection timed out
    ... 10 more


On Wed, May 27, 2020 at 7:14 PM Xintong Song <tonysong...@gmail.com> wrote:

> Ah, I guess I had misunderstood what your mean.
>
> Below 18000 tasks, the Flink Job is able to start up.
>> Even though I increased the number of slots, it still works when 312
>> slots are being used.
>>
> When you say "it still works", I thought that you increased the
> parallelism the job was sill executed as the parallelism was not increased.
> From your latest reply, it seems the job's parallelism is indeed
> increased, but then it runs into failures.
>
> The reason you run into the "Insufficient number of network buffers"
> exception, is that with more tasks in your job, more inter-task data
> transmission channels, thus memory for network buffers, are needed.
>
> To increase the network memory size, the following configuration options,
> as you already found, are related.
>
>    - taskmanager.network.memory.fraction
>    - taskmanager.network.memory.max
>    - taskmanager.network.memory.min
>
> Please be aware that `taskmanager.memory.task.off-heap.size` is not
> related to network memory, and is only available in Flink 1.10 and above
> while you're using 1.9.1 as suggested by the screenshots.
>
> The network memory size is calculated as `min(max(some_total_value *
> network_fraction, network_min), network_max)`. According to the error
> message, your current network memory size is `85922 buffers * 32KB/buffer
> = 2685MB`, smaller than your "max" (4gb). That means increasing the "max"
> does not help in your case. It is the "fraction" that you need to increase.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi Xintong,
>> Looks like the issue is not fully resolved :( Attaching 2 screenshots of
>> the memory consumption of 1 of the TaskManagers.
>>
>> To increase the used up Direct memory off heap,Do I change this:
>>  taskmanager.memory.task.off-heap.size: 5gb
>>
>> I had increased the taskmanager.network.memory.max: 24gb
>> which seems excessive.
>>
>> 1 of the errors I saw in the Flink logs:
>>
>> java.io.IOException: Insufficient number of network buffers: required 1,
>> but only 0 available. The total number of network buffers is currently set
>> to 85922 of 32768 bytes each. You can increase this number by setting the
>> configuration keys 'taskmanager.network.memory.fraction',
>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>> at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
>> at
>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)
>>
>> TIA,
>>
>>
>> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Thanks so much, Xintong for guiding me through this. I looked at the
>>> Flink logs to see the errors.
>>> I had to change taskmanager.network.memory.max: 4gb
>>> and akka.ask.timeout: 240s to increase the number of tasks.
>>> Now, I am able to increase the number of Tasks/ aka Task vertices.
>>>
>>> taskmanager.network.memory.fraction: 0.15
>>> taskmanager.network.memory.max: 4gb
>>> taskmanager.network.memory.min: 500mb
>>> akka.ask.timeout: 240s
>>>
>>> On Tue, May 26, 2020 at 8:42 PM Xintong Song <tonysong...@gmail.com>
>>> wrote:
>>>
>>>> Could you also explain how do you set the parallelism when getting this
>>>> execution plan?
>>>> I'm asking because this json file itself only shows the resulted
>>>> execution plan. It is not clear to me what is not working as expected in
>>>> your case. E.g., you set the parallelism for an operator to 10 but the
>>>> execution plan only shows 5.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <bvija...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Xintong,
>>>>> Thanks for the excellent clarification for tasks.
>>>>>
>>>>> I attached a sample screenshot above and din't reflect the slots used
>>>>> and the tasks limit I was running into in that pic.
>>>>>
>>>>> I am attaching my Execution plan here. Please let me know how I can
>>>>> increase the nmber of tasks aka parallelism. As  increase the parallelism,
>>>>> i run into this bottleneck with the tasks.
>>>>>
>>>>> BTW - The https://flink.apache.org/visualizer/ is a great start to
>>>>> see this.
>>>>> TIA,
>>>>>
>>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song <tonysong...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Increasing network memory buffers (fraction, min, max) seems to
>>>>>>> increase tasks slightly.
>>>>>>
>>>>>> That's wired. I don't think the number of network memory buffers have
>>>>>> anything to do with the task amount.
>>>>>>
>>>>>> Let me try to clarify a few things.
>>>>>>
>>>>>> Please be aware that, how many tasks a Flink job has, and how many
>>>>>> slots a Flink cluster has, are two different things.
>>>>>> - The number of tasks are decided by your job's parallelism and
>>>>>> topology. E.g., if your job graph have 3 vertices A, B and C, with
>>>>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4)
>>>>>> tasks.
>>>>>> - The number of slots are decided by number of TMs and slots-per-TM.
>>>>>> - For streaming jobs, you have to make sure the number of slots is
>>>>>> enough for executing all your tasks. The number of slots needed for
>>>>>> executing your job is by default the max parallelism of your job graph
>>>>>> vertices. Take the above example, you would need 4 slots, because it's 
>>>>>> the
>>>>>> max among all the vertices' parallelisms (2, 3, 4).
>>>>>>
>>>>>> In your case, the screenshot shows that you job has 9621 tasks in
>>>>>> total (not around 18000, the dark box shows total tasks while the green 
>>>>>> box
>>>>>> shows running tasks), and 600 slots are in use (658 - 58) suggesting that
>>>>>> the max parallelism of your job graph vertices is 600.
>>>>>>
>>>>>> If you want to increase the number of tasks, you should increase your
>>>>>> job parallelism. There are several ways to do that.
>>>>>>
>>>>>>    - In your job codes (assuming you are using DataStream API)
>>>>>>       - Use `StreamExecutionEnvironment#setParallelism()` to set
>>>>>>       parallelism for all operators.
>>>>>>       - Use `SingleOutputStreamOperator#setParallelism()` to set
>>>>>>       parallelism for a specific operator. (Only supported for 
>>>>>> subclasses of
>>>>>>       `SingleOutputStreamOperator`.)
>>>>>>    - When submitting your job, use `-p <parallelism>` as an argument
>>>>>>    for the `flink run` command, to set parallelism for all operators.
>>>>>>    - Set `parallelism.default` in your `flink-conf.yaml`, to set a
>>>>>>    default parallelism for your jobs. This will be used for jobs that 
>>>>>> have not
>>>>>>    set parallelism with neither of the above methods.
>>>>>>
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <
>>>>>> bvija...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Xintong,
>>>>>>> Thx for your reply.  Increasing network memory buffers (fraction,
>>>>>>> min, max) seems to increase tasks slightly.
>>>>>>>
>>>>>>> Streaming job
>>>>>>> Standalone
>>>>>>>
>>>>>>> Vijay
>>>>>>>
>>>>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song <tonysong...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Vijay,
>>>>>>>>
>>>>>>>> I don't think your problem is related to number of opening files.
>>>>>>>> The parallelism of your job is decided before actually tries to open 
>>>>>>>> the
>>>>>>>> files. And if the OS limit for opening files is reached, you should 
>>>>>>>> see a
>>>>>>>> job execution failure, instead of a success execution with a lower
>>>>>>>> parallelism.
>>>>>>>>
>>>>>>>> Could you share some more information about your use case?
>>>>>>>>
>>>>>>>>    - What kind of job are your executing? Is it a streaming or
>>>>>>>>    batch processing job?
>>>>>>>>    - Which Flink deployment do you use? Standalone? Yarn?
>>>>>>>>    - It would be helpful if you can share the Flink logs.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <
>>>>>>>> bvija...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I have increased the number of slots available but the Job is not
>>>>>>>>> using all the slots but runs into this approximate 18000 Tasks limit.
>>>>>>>>> Looking into the source code, it seems to be opening file -
>>>>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>>>>>>>>> So, do I have to tune the ulimit or something similar at the
>>>>>>>>> Ubuntu O/S level to increase number of tasks available ? What I am 
>>>>>>>>> confused
>>>>>>>>> about is the ulimit is per machine but the ExecutionGraph is across 
>>>>>>>>> many
>>>>>>>>> machines ? Please pardon my ignorance here. Does number of tasks 
>>>>>>>>> equate to
>>>>>>>>> number of open files. I am using 15 slots per TaskManager on AWS 
>>>>>>>>> m5.4xlarge
>>>>>>>>> which has 16 vCPUs.
>>>>>>>>>
>>>>>>>>> TIA.
>>>>>>>>>
>>>>>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <
>>>>>>>>> bvija...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Flink Dashboard UI seems to show tasks having a hard limit for
>>>>>>>>>> Tasks column around 18000 on a Ubuntu Linux box.
>>>>>>>>>> I kept increasing the number of slots per task manager to 15 and
>>>>>>>>>> number of slots increased to 705 but the slots to tasks
>>>>>>>>>> stayed at around 18000. Below 18000 tasks, the Flink Job is able
>>>>>>>>>> to start up.
>>>>>>>>>> Even though I increased the number of slots, it still works when
>>>>>>>>>> 312 slots are being used.
>>>>>>>>>>
>>>>>>>>>> taskmanager.numberOfTaskSlots: 15
>>>>>>>>>>
>>>>>>>>>> What knob can I tune to increase the number of Tasks ?
>>>>>>>>>>
>>>>>>>>>> Pls find attached the Flink Dashboard UI.
>>>>>>>>>>
>>>>>>>>>> TIA,
>>>>>>>>>>
>>>>>>>>>>

Reply via email to