Hey Xintong, thanks for the explanations. For the first part, can I confirm whether some of my understandings are correct here:
For Akka direct memory, it's part of the framework.off-heap; we also use FlinkKafkaConsumers and FlinkKafkaProducers in our pipeline, because of the netty usage within them, we need to set up a non-zero task.off-heap? Thanks! ________________________________ From: Xintong Song <tonysong...@gmail.com> Sent: Wednesday, April 29, 2020 10:53 PM To: Jiahui Jiang <qzhzm173...@hotmail.com> Cc: Steven Wu <stevenz...@gmail.com>; user@flink.apache.org <user@flink.apache.org> Subject: Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10 Hi Jiahui, I'd like to clarify a bit more. I think in our case, it was actually the network buffer size that was too large (we were seeing Akka exception), which happened to be fixed by increasing task.off-heap.size since that just make the direct memory larger. Please be aware that 'taskmanager.memory.network.*' only accounts for the network buffer pool for Flink's data exchange between tasks. There are other direct memory footprints accounted by task/framework off-heap memory, including some akka/netty direct memory. Network buffer pool has a fixed size. It allocates all the configured memory at initialization and guarantees not exceeding the configured value. In your case, increasing configured network memory size should not help, because the all increased direct memory limit will be take away by the network buffer pool, leaves the same size to other direct memory consumptions. tweaking jvm-overhead should be a much more common thing to do! This is not always true. When talking about "off-heap memory", there are actually three categories: direct memory, metaspace, and native memory. Direct memory is controlled by JVM parameter '-XX:MaxDirectMemorySize', which is set to the sum of Flink's framework/task off-heap memory and network memory. Metaspace is controlled by JVM parameter '-XX:MaxMetaspaceSize', which is set to 'taskmanager.memory.metaspace.size'. Native memory is not controlled by JVM. In Flink, managed memory and jvm-overhead are using native memory. That means, if you see a JVM OOM, increasing jvm-overhead should not help. Thank you~ Xintong Song On Thu, Apr 30, 2020 at 11:06 AM Jiahui Jiang <qzhzm173...@hotmail.com<mailto:qzhzm173...@hotmail.com>> wrote: Hey Xintong, Steven, thanks for replies! @Steven Wu<mailto:stevenz...@gmail.com> thanks for the link! I didn't realize for all the different direct memory configs, even though they can be configured separately, it's only the sum that will be used to set JVM parameter. I think in our case, it was actually the network buffer size that was too large (we were seeing Akka exception), which happened to be fixed by increasing task.off-heap.size since that just make the direct memory larger. But agree most of the time we shouldn't need to change this value at all and tweaking jvm-overhead should be a much more common thing to do! I will test our some more pipeline with different resource requirements to understand the memory profiles better! Thank you guys again! ________________________________ From: Steven Wu <stevenz...@gmail.com<mailto:stevenz...@gmail.com>> Sent: Wednesday, April 29, 2020 10:12 AM To: Xintong Song <tonysong...@gmail.com<mailto:tonysong...@gmail.com>> Cc: Jiahui Jiang <qzhzm173...@hotmail.com<mailto:qzhzm173...@hotmail.com>>; user@flink.apache.org<mailto:user@flink.apache.org> <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Configuring taskmanager.memory.task.off-heap.size in Flink 1.10 Jiahui, Based on my reading on the doc, for containerized environment, it is probably better to set `taskmanager.memory.process.size` to the container memory limit. https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-process-size Then I typically set `taskmanager.memory.jvm-overhead.max` to allocate some overhead to non Flink memory. I think it matches the intention better than 'taskmanager.memory.task.off-heap.size' , which is used for calculating JVM direct memory size. https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-jvm-overhead-max I also found this Flink doc pretty helpful https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html Hope that helps. Thanks, Steven On Tue, Apr 28, 2020 at 8:56 PM Xintong Song <tonysong...@gmail.com<mailto:tonysong...@gmail.com>> wrote: Hi Jiahui, 'taskmanager.memory.task.off-heap.size' accounts for the off-heap memory reserved for your job / operators. There are other configuration options accounting for the off-heap memory usages for other purposes, e.g., 'taskmanager.memory.framework.off-heap'. The default 'task.off-heap.size' being 0 only represents that in most cases user codes / operators do not use off-heap memory. User would need to explicitly increase this configuration if UDFs or libraries of the job uses off-heap memory. Thank you~ Xintong Song On Wed, Apr 29, 2020 at 11:07 AM Jiahui Jiang <qzhzm173...@hotmail.com<mailto:qzhzm173...@hotmail.com>> wrote: Hello! We are migrating our pipeline from Flink 1.8 to 1.10 in Kubernetes. In the first try, we simply copied the old 'taskmanager.heap.size' over to 'taskmanager.memory.flink.size'. This caused the cluster to OOM. Eventually we had to allocate a small amount of memory to 'taskmanager.memory.task.off-heap.size' for it to stop failing. But we don't quite understand why this needs to be overriden. I saw the default for 'taskmanager.memory.task.off-heap.size' is 0, does that mean in most cases task managers won't need off-heap memory? What are some examples that off-heap memory need to be non-zero? Thank you!