Hi Forideal, which Flink version are you using? If you using 1.9 or older, have a look at the memory setup [1] and config docs [2]. If you are using 1.10, it should be enough to increase* taskmanager.network.memory.**fraction* and *taskmanager.network.memory.**max*. You shouldn't use *taskmanager.network.* *numberOfBuffers* anymore.
In general, your job uses 25 TMs (400 parallelism/16 slots). So, a fully-connected operator instance (hash) on 1 TM needs the following network channels to communicate with another operator instance: 24 other TM * 16 slot * 16 slot = 6144 network channels. You have 4 hash operators and each channel requires 2 output and 2 input buffers (taskmanager.network.memory.buffers-per-channel) where each buffer needs 32KB (taskmanager.memory.segment-size). That means, your TM requires 6144 channels * 4 operators * 4 buffers = 98304 buffers. There are also some floating buffers (taskmanager.network.memory.floating-buffers-per-gate), but at this point, you can safely say that your job requires 100k buffers per TM, which is 3.2 GB. To configure your cluster in 1.10, set *taskmanager.network.memory.**max *to a value that is higher than 3.2 GB. Your *taskmanager.network.memory.**fraction *must be increased as well (depending on your TM memory size). To decrease memory consumption, you have the following options: * Decrease buffer size; that's mostly useful when you have lots of smaller records. (linear to memory consumption) * Decrease the parallelism. Only useful, if you don't need so much computation power. (linear to memory consumption as it reduces the number of TMs) * Decrease the number of slots per TM. If you have 35 TM's, you need at most 12 slots per TM for your job (quadratic! to memory consumption). You can achieve the same through scheduling options [3]. * Decrease the number of hash operations. Without knowing specifics. You might be able to merge all hashes into one. (linear to memory consumption) * Decrease the number of exclusive buffers per channel. (linear to memory consumption) This will most likely decrease performance by 10-20%. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#network-buffers [2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#setting-the-number-of-network-buffers-directly [3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#advanced-scheduling-options On Mon, Mar 23, 2020 at 1:20 PM forideal <fszw...@163.com> wrote: > Hi Xintong, > > Thank you for your reply. > > Do you mean you have 700 slots per TM or in total? How many TMs do you > have? And how many slots do you have per TM? > > I have a Flink Cluster with 35 TMs,each TM has 16 slots. > cluster info: total TMs=35 ,total slots=560 > Job info: request slot 400 > > It is after the job is fully initiated? > > No,the job can't init. > > > Topology > > op1-hash->op2-hash->op3-hash->op4 > | > |-hash->op5 > op1 parallelism is 200 > op2 parallelism is 400 > op3 parallelism is 400 > op4 parallelism is 400 > op5 parallelism is 400 > > Best Wishes > forideal > > At 2020-03-20 15:20:07, "Xintong Song" <tonysong...@gmail.com> wrote: > > Hi Forideal, > > Do you mean you have 700 slots per TM or in total? How many TMs do you > have? And how many slots do you have per TM? > > Also, when is the screenshot taken? It is after the job is fully > initiated? It seems you only need 1k+ network buffers. > > Thank you~ > > Xintong Song > > > > On Fri, Mar 20, 2020 at 12:07 PM forideal <fszw...@163.com> wrote: > >> Hi community >> >> This parameter makes me confused. >> >> taskmanager.network.numberOfBuffers >> 700000 >> In my job, i use 700 slots, but ,i have to set the this parameter to >> 700000.If >> not,i will get a exception. >> >> java.io.IOException: Insufficient number of network buffers: >> required 700, but only 1 available. The total number of network buffers is >> currently set to 80000 of 32768 bytes each. You can increase this number by >> setting the configuration keys 'taskmanager.network.memory.fraction', >> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. >> >> But actually this waste too many resource. >> Memory Segments >> TypeCount >> Available 698,838 >> >> Total >> 700,000 >> Direct 700,103 21.4 GB 21.4 GB >> Mapped 0 0 B 0 B >> Best Wishes >> forideal >> >> >> >> >> > > > >