unsubscribe
----- 原始邮件 -----
发件人:Simon Frei <simon.f...@spire.com>
收件人:Alexis Sarda-Espinosa <sarda.espin...@gmail.com>, user 
<user@flink.apache.org>
主题:Re: unexpected high mem. usage / potential config misinterpretation
日期:2024年09月19日 04点11分


Hi Alexis,
 
Thanks for chiming in. To my knowledge and experience that’s not how it
works – we can consistently succeed to run this job in batch mode on less data, 
but still considerably more data than would fit in memory. Also even
 on the full data, the initial source tasks always succeed. To my understanding 
flink in batch mode writes all the data produced by a source/operator to disk. 
It even can run on less memory than the equivalent streaming job, as it can 
process a small part of
 the data in a partition at once from/to disk, without having to keep the state 
of all the data at once in memory. At least that’s my observation and 
understanding of e.g. this documentation:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/#batch-execution-mode
 
Best,
Simon
 

From:
Alexis Sarda-Espinosa <sarda.espin...@gmail.com>

Date: Wednesday, 18 September 2024 at 19:45

To: Simon Frei <simon.f...@spire.com>

Cc: user <user@flink.apache.org>

Subject: Re: unexpected high mem. usage / potential config misinterpretation


 


Hi Simon, 

 


I hope someone corrects me if I'm wrong, but just based on "batch mode 
processing terabytes of data", I feel batch mode may be the issue. I am under 
the impression that batch mode forces everything emitted by the sources to RAM 
before any
 downstream operators do anything, so even if each parallel task of your source 
runs in a different task manager and loads a subset of your data, they might 
each end up trying to load terabytes into RAM.


 


Hopefully someone more knowledgeable about batch mode can comment. 


 


Regards, 


Alexis. 

 


On Wed, 18 Sept 2024, 18:04 Simon Frei, <simon.f...@spire.com> wrote:




Hi,
 
tl;dr:
Flink batch streaming API job resident memory usage grows far beyond 
expectations, resulting in system OOM kill/JVM native memory allocation failure 
- would appreciate
 a look over our config/assumptions to potentially spot any obvious mistakes.
 
Longer form:
 
My colleague and I are troubleshooting a large batch job for a long time, and 
still experience behaviour around flinks memory usage we cannot explain. My hope
 is that by explaining our configuration and observations, someone can spot a 
misconception. And in the ideal case I can then send a PR for the documentation 
to hopefully make that misconception less likely for other users.
 
I'll start with an overview/"story-like" form, and then below that are some 
numbers/configs.
 
This is a streaming job run in batch mode, processing terabytes of data 
sourcing and sinking to compressed files in S3. In between there are a few 
simple decoding
 and filter operations, then two processors with our main business logic and 
finally a few simple transformations and reduce steps. While reduce and sink 
writer tasks run, we encounter much more resident memory usage of the flink TM 
java process than expected
 from configuration, i.e. higher than the configured process memory. And that 
leads to failures, either the system OOM killer intervening or the JVM not 
being able to mmap. I know that the writers do use native memory, e.g. for avro 
deflate compression, which
 is a native method. Also the IO likely uses some native memory. We now 
configure 5g of task off-heap memory to compensate for any such native memory 
usage, but still encounter higher memory usage. Even 5g seems way too much for 
some compression buffers and
 IO, let alone more than that. So currently my main theory is that I 
misunderstand something about the memory related config. E.g. that task slots 
factor into used/allocated memory.
 
We during the late stages of the job, i.e. during reduce and sink operations, 
we observe much higher memory usage than expected. The increase in memory usage
 isn't happening slowly, gradually over time, but quickly when those tasks 
start. This is an example of ps output for one TM:
 

PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND

        14961  130 94.8 32213536 30957104 ?   Sl   Sep12 7453:03 
/usr/lib/jvm/java-11-amazon-corretto.aarch64/bin/java -Xmx7435661840 
-Xms7435661840 -XX:MaxDirectMemorySize=9663676416 
-XX:MaxMetaspaceSize=268435456 
-Dlog.file=/var/log/hadoop-yarn/containers/application_1724829444792_0007/container_1724829444792_0007_01_000330/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.network.min=2147483648b -D taskmanager.cpu.cores=4.0 -D 
taskmanager.memory.task.off-heap.size=5368709120b
 -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none 
-D taskmanager.memory.jvm-overhead.min=1073741824b -D 
taskmanager.memory.framework.off-heap.size=2147483648b -D 
taskmanager.memory.network.max=2147483648b -D 
taskmanager.memory.framework.heap.size=134217728b
 -D taskmanager.memory.managed.size=7328288240b -D 
taskmanager.memory.task.heap.size=7301444112b -D 
taskmanager.numberOfTaskSlots=4 -D 
taskmanager.memory.jvm-overhead.max=1073741824b --configDir . 
-Djobmanager.rpc.address=ip-172-30-119-251.us-west-2.compute.internal
 -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b 
-Dweb.tmpdir=/tmp/flink-web-81fb345d-de64-4002-bd78-4454ca901566 
-Djobmanager.rpc.port=42195 
-Drest.address=ip-172-30-119-251.us-west-2.compute.internal 
-Djobmanager.memory.jvm-overhead.max=322122552b
 -Djobmanager.memory.jvm-overhead.min=322122552b 
-Dtaskmanager.resource-id=container_1724829444792_0007_01_000330 
-Dinternal.taskmanager.resource-id.metadata=ip-172-30-116-113.us-west-2.compute.internal:8041
 -Djobmanager.memory.jvm-metaspace.size=268435456b
 -Djobmanager.memory.heap.size=2496449736b
 
For easier readability:
RSS = 30957104kB = 29.5GB
 
Then this is the relevant bit of our config file. It includes explanations of 
how we came up with those numbers, so here's where I hope someone can quickly 
tell
 me where I am wrong :)
 
        # We configure yarn to provide 24g to containers, rest for system. It 
turns out
        # yarn doesn't enforce that memory limit though (we exceeded it at 
times).
        taskmanager.memory.process.size: 24g
        # We have 4 cores on the machine, so the flink default of task slots == 
vcpu is
        # fine (as in we make use of all cores).
        taskmanager.numberOfTaskSlots: 4
 
        # Flink memory calculation/provisioning isn't entirely 
straight-forward, thus
        # explaining steps here (best resource:
        #

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model).
        #
        # Settings from 2022 (partly based on flink documentation 
recommendations):
        # Enable sort-based blocking shuffle and associated config 
recommendations.
        # This will become the default in 1.15.
        #

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/
        taskmanager.network.blocking-shuffle.compression.enabled: true
        taskmanager.network.sort-shuffle.min-parallelism: 1
        # batch-shuffle is taken from overall framework.off-heap, i.e. the 
latter needs
        # to be at least as big as the former.
        taskmanager.memory.framework.off-heap.batch-shuffle.size: 1024m
        taskmanager.memory.framework.off-heap.size: 2g
        # Number of 32K buffers (taskmanager.memory.segment-size), taken from 
network memory.
        taskmanager.network.sort-shuffle.min-buffers: 2048
        # Not sure why/if we need this, e.g. if it's related to the above. 
Failures
        # happened when removing it :)
        taskmanager.memory.network.min: 2g
        taskmanager.memory.network.max: 2g
 
        # We leave jvm settings at defaults:
        #   flink = process - jvm-overhead - jvm-metaspace = 24g - 1g - 0.25g = 
22.75g
        #
        # OOMs happened due to memory usage not part of flink (managed, 
network) or JVM
        # heap - some native allocations. To avoid that we reserve ample 
non-heap task
        # memory (memory flink will leave free). This might be too much, we 
didn't
        # optimize once it ran fine:
        taskmanager.memory.task.off-heap.size: 5g
        # Managed memory (useful for efficient flink data operations, e.g. 
sorting,
        # caching) and heap remain to be set:
        #   flink = heap + managed + direct = heap + managed + network + 
framework + task-off-heap
        #         = heap + managed + 2g + 2g + 5g
        #   -> heap + managed = 22.75g - 2g - 2g - 5g = 13.75g
        # Metrics showed heap usage resp. garbage collection activity is very 
low at
        # ~10g, so it's fine to go even a bit lower in favor of more managed 
memory
        # (used by flink for various operations directly on raw data e.g. 
sorting,
        # caching, and documented to be needed/useful in batch mode):
        taskmanager.memory.managed.fraction: 0.2f
        #   heap = 13.75g - flink * managed-fraction = 13.75g - 22.75g * 0.2 = 
9.2g
 
And just in case, listing a few things we do not see/have problems with: Heap 
memory usage/GC activity is low. No errors about missing network buffers or too
 little managed memory. All metrics we get from flink/JVM look perfectly fine, 
it's just the java process resident memory usage that is growing beyond what we 
expect and the following system level OOM.
 
I'd be immensely grateful for any pointers/feedback on the above. And I hope I 
can repay the kindness by improving the docs (memory or config), if any 
learnings
 apply to them.
 
Best,
Simon







Reply via email to