Hi Vijay, The memory configurations in Flink 1.9 and previous versions are indeed complicated and confusing. That is why we made significant changes to it in Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 which is very likely to be released in this month.
Regarding your questions, - "Physical Memory" displayed on the web ui stands for the total memory on your machine. This information is retrieved from your OS. It is not related to the network memory calculation. It is displayed mainly for historical reasons. - The error message means that you have about 26.8 GB network memory (877118 * 32768 bytes), and your job is trying to use more. - The "total memory" referred in network memory calculation is: - jvm-heap + network, if managed memory is configured on-heap (default) - According to your screenshot, the managed memory on-heap/off-heap configuration is not touched, so this should be your case. - jvm-heap + managed + network, if managed memory is configured off-heap - The network memory size is actually derived reversely. Flink reads the max heap size from JVM (and the managed memory size from configuration if it is configured off-heap), and derives the network memory size with the following equation. - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / (1-networkFraction) * networkFraction)) - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) = 26.8GB One thing I don't understand is, why do you only have 29GB heap size when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" to represent "taskmanager.heap.size" for short. Also omitted the calculations when managed memory is configured off-heap. - Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB - On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB Have you specified a custom "-Xmx" parameter? Thank you~ Xintong Song On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Get this error: > java.io.IOException: Insufficient number of network buffers: required 2, > but only 0 available. The total number of network buffers is currently set > to 877118 of 32768 bytes each. You can increase this number by setting the > configuration keys 'taskmanager.network.memory.fraction', > 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#-1420732632]] after [10000 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A > typical reason for `AskTimeoutException` is that the recipient actor didn't > send a reply. > > > Followed docs here: > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html > > network = Min(max, Max(min, fraction x total) //what does Total mean - > The max JVM heap is used to derive the total memory for the calculation of > network buffers. - can I see it in the Flink Dashboard ??? 117GB here ? > = Min(50G, Max(500mb, Max(0.48 * 117G)) ) = MIn(50G, 56.16G)= 50G > 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ? > Used this in flink-conf.yaml: > taskmanager.numberOfTaskSlots: 10 > rest.server.max-content-length: 314572800 > taskmanager.network.memory.fraction: 0.45 > taskmanager.network.memory.max: 50gb > taskmanager.network.memory.min: 500mb > akka.ask.timeout: 240s > cluster.evenly-spread-out-slots: true > akka.tcp.timeout: 240s > taskmanager.network.request-backoff.initial: 5000 > taskmanager.network.request-backoff.max: 30000 > web.timeout:1000000 > web.refresh-interval:6000 > > Saw some old calc about buffers > (slots/Tm * slots/TM) * #TMs * 4 > =10 * 10 * 47 * 4 = 18,800 buffers. > > What am I missing in the network buffer calc ?? > > TIA, > > >