Hi Simon, I hope someone corrects me if I'm wrong, but just based on "batch mode processing terabytes of data", I feel batch mode may be the issue. I am under the impression that batch mode forces everything emitted by the sources to RAM before any downstream operators do anything, so even if each parallel task of your source runs in a different task manager and loads a subset of your data, they might each end up trying to load terabytes into RAM.
Hopefully someone more knowledgeable about batch mode can comment. Regards, Alexis. On Wed, 18 Sept 2024, 18:04 Simon Frei, <simon.f...@spire.com> wrote: > Hi, > > > > tl;dr: > > Flink batch streaming API job resident memory usage grows far beyond > expectations, resulting in system OOM kill/JVM native memory allocation > failure - would appreciate a look over our config/assumptions to > potentially spot any obvious mistakes. > > > > Longer form: > > > > My colleague and I are troubleshooting a large batch job for a long time, > and still experience behaviour around flinks memory usage we cannot > explain. My hope is that by explaining our configuration and observations, > someone can spot a misconception. And in the ideal case I can then send a > PR for the documentation to hopefully make that misconception less likely > for other users. > > > > I'll start with an overview/"story-like" form, and then below that are > some numbers/configs. > > > > This is a streaming job run in batch mode, processing terabytes of data > sourcing and sinking to compressed files in S3. In between there are a few > simple decoding and filter operations, then two processors with our main > business logic and finally a few simple transformations and reduce steps. > While reduce and sink writer tasks run, we encounter much more resident > memory usage of the flink TM java process than expected from configuration, > i.e. higher than the configured process memory. And that leads to failures, > either the system OOM killer intervening or the JVM not being able to mmap. > I know that the writers do use native memory, e.g. for avro deflate > compression, which is a native method. Also the IO likely uses some native > memory. We now configure 5g of task off-heap memory to compensate for any > such native memory usage, but still encounter higher memory usage. Even 5g > seems way too much for some compression buffers and IO, let alone more than > that. So currently my main theory is that I misunderstand something about > the memory related config. E.g. that task slots factor into used/allocated > memory. > > > > We during the late stages of the job, i.e. during reduce and sink > operations, we observe much higher memory usage than expected. The increase > in memory usage isn't happening slowly, gradually over time, but quickly > when those tasks start. This is an example of ps output for one TM: > > > > PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND > > 14961 130 94.8 32213536 30957104 ? Sl Sep12 7453:03 > /usr/lib/jvm/java-11-amazon-corretto.aarch64/bin/java -Xmx7435661840 > -Xms7435661840 -XX:MaxDirectMemorySize=9663676416 > -XX:MaxMetaspaceSize=268435456 > -Dlog.file=/var/log/hadoop-yarn/containers/application_1724829444792_0007/container_1724829444792_0007_01_000330/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties > -Dlog4j.configurationFile=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner -D > taskmanager.memory.network.min=2147483648b -D taskmanager.cpu.cores=4.0 -D > taskmanager.memory.task.off-heap.size=5368709120b -D > taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none > -D taskmanager.memory.jvm-overhead.min=1073741824b -D > taskmanager.memory.framework.off-heap.size=2147483648b -D > taskmanager.memory.network.max=2147483648b -D > taskmanager.memory.framework.heap.size=134217728b -D > taskmanager.memory.managed.size=7328288240b -D > taskmanager.memory.task.heap.size=7301444112b -D > taskmanager.numberOfTaskSlots=4 -D > taskmanager.memory.jvm-overhead.max=1073741824b --configDir . > -Djobmanager.rpc.address=ip-172-30-119-251.us-west-2.compute.internal > -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b > -Dweb.tmpdir=/tmp/flink-web-81fb345d-de64-4002-bd78-4454ca901566 > -Djobmanager.rpc.port=42195 > -Drest.address=ip-172-30-119-251.us-west-2.compute.internal > -Djobmanager.memory.jvm-overhead.max=322122552b > -Djobmanager.memory.jvm-overhead.min=322122552b > -Dtaskmanager.resource-id=container_1724829444792_0007_01_000330 > -Dinternal.taskmanager.resource-id.metadata=ip-172-30-116-113.us-west-2.compute.internal:8041 > -Djobmanager.memory.jvm-metaspace.size=268435456b > -Djobmanager.memory.heap.size=2496449736b > > > > For easier readability: > > RSS = 30957104kB = 29.5GB > > > > Then this is the relevant bit of our config file. It includes explanations > of how we came up with those numbers, so here's where I hope someone can > quickly tell me where I am wrong :) > > > > # We configure yarn to provide 24g to containers, rest for system. > It turns out > > # yarn doesn't enforce that memory limit though (we exceeded it at > times). > > taskmanager.memory.process.size: 24g > > # We have 4 cores on the machine, so the flink default of task > slots == vcpu is > > # fine (as in we make use of all cores). > > taskmanager.numberOfTaskSlots: 4 > > > > # Flink memory calculation/provisioning isn't entirely > straight-forward, thus > > # explaining steps here (best resource: > > # > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model > ). > > # > > # Settings from 2022 (partly based on flink documentation > recommendations): > > # Enable sort-based blocking shuffle and associated config > recommendations. > > # This will become the default in 1.15. > > # > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/ > > taskmanager.network.blocking-shuffle.compression.enabled: true > > taskmanager.network.sort-shuffle.min-parallelism: 1 > > # batch-shuffle is taken from overall framework.off-heap, i.e. the > latter needs > > # to be at least as big as the former. > > taskmanager.memory.framework.off-heap.batch-shuffle.size: 1024m > > taskmanager.memory.framework.off-heap.size: 2g > > # Number of 32K buffers (taskmanager.memory.segment-size), taken > from network memory. > > taskmanager.network.sort-shuffle.min-buffers: 2048 > > # Not sure why/if we need this, e.g. if it's related to the above. > Failures > > # happened when removing it :) > > taskmanager.memory.network.min: 2g > > taskmanager.memory.network.max: 2g > > > > # We leave jvm settings at defaults: > > # flink = process - jvm-overhead - jvm-metaspace = 24g - 1g - > 0.25g = 22.75g > > # > > # OOMs happened due to memory usage not part of flink (managed, > network) or JVM > > # heap - some native allocations. To avoid that we reserve ample > non-heap task > > # memory (memory flink will leave free). This might be too much, > we didn't > > # optimize once it ran fine: > > taskmanager.memory.task.off-heap.size: 5g > > # Managed memory (useful for efficient flink data operations, e.g. > sorting, > > # caching) and heap remain to be set: > > # flink = heap + managed + direct = heap + managed + network + > framework + task-off-heap > > # = heap + managed + 2g + 2g + 5g > > # -> heap + managed = 22.75g - 2g - 2g - 5g = 13.75g > > # Metrics showed heap usage resp. garbage collection activity is > very low at > > # ~10g, so it's fine to go even a bit lower in favor of more > managed memory > > # (used by flink for various operations directly on raw data e.g. > sorting, > > # caching, and documented to be needed/useful in batch mode): > > taskmanager.memory.managed.fraction: 0.2f > > # heap = 13.75g - flink * managed-fraction = 13.75g - 22.75g * > 0.2 = 9.2g > > > > And just in case, listing a few things we do not see/have problems with: > Heap memory usage/GC activity is low. No errors about missing network > buffers or too little managed memory. All metrics we get from flink/JVM > look perfectly fine, it's just the java process resident memory usage that > is growing beyond what we expect and the following system level OOM. > > > > I'd be immensely grateful for any pointers/feedback on the above. And I > hope I can repay the kindness by improving the docs (memory or config), if > any learnings apply to them. > > > > Best, > > Simon >