Thanks Caizhi, this was very helpful. // ah
From: Caizhi Weng <tsreape...@gmail.com> Sent: Thursday, August 26, 2021 10:41 PM To: Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com> Cc: user@flink.apache.org Subject: Re: 1.9 to 1.11 Managed Memory Migration Questions Hi! I've read the first mail again and discover that the direct memory OOM occurs when the job is writing to the sink, not when the data is transferring between tasks through the network. I'm not familiar with HDFS, but I guess writing to HDFS will require some direct memory. Maybe a detailed stack trace will prove me right. How does this relate with the overall TaskManager process memory See [1] for the detailed memory module. In short, task.off-heap.size is a part of the overall task manager process memory, which is dedicated for user's code (by user's code I mean UDF, sources, sinks, etc.). Network off-heap memory is only used for shuffling data between tasks and will not help for writing to HDFS, also managed memory is only used for operators such as joins and aggregations. is there a way to make this scale along with it for jobs that process larger batches of data? There is no way to do so currently, but I don't see why this is needed because for larger batches of data (I suppose you will increase the parallelism for larger data) more task managers will be allocated by Yarn (if you're not increasing the number of slots per task manager provides). All these memory settings are for per task managers, which means as the number of task managers scales, the total size of off-heap memory naturally scales. What does having a default value of 0 bytes mean? Most sources and sinks are not using direct memory, so a default 0 value is reasonable. Only when the user needs it (for example in this case you're using an HDFS sink) shall he allocate this part of memory. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_memory_mem-5Fsetup-5Ftm.html-23detailed-2Dmemory-2Dmodel&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=sZ2fkdpbwJ-blzd0Ch6lj9ZeGNissEN4890aD86V0ig&s=OAIyebbjZVyznz3av3_PaWZXaEF1PeitPvQL9PE0gdk&e=> Hailu, Andreas [Engineering] <andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> 于2021年8月27日周五 上午5:16写道: Hi Caizhi, thanks for responding. The networking keys you suggested didn’t help, but I found that adding the ‘taskmanager.memory.task.off-heap.size’ with a value of ‘1g’ lead to a successful job. I can see on this property’s documentation [1] that the default value is 0 bytes. Task Off-Heap Memory size for TaskExecutors. This is the size of off heap memory (JVM direct memory and native memory) reserved for tasks. The configured value will be fully counted when Flink calculates the JVM max direct memory size parameter. 0 bytes (default) From the mem setup page: [2] The off-heap memory which is allocated by user code should be accounted for in task off-heap memory. A few points of clarity if you would: 1. How does this relate with the overall TaskManager process memory, and is there a way to make this scale along with it for jobs that process larger batches of data? 2. What does having a default value of 0 bytes mean? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#taskmanager-memory-task-off-heap-size<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_config.html-23taskmanager-2Dmemory-2Dtask-2Doff-2Dheap-2Dsize&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=sZ2fkdpbwJ-blzd0Ch6lj9ZeGNissEN4890aD86V0ig&s=gnGvxXf_ZRe9UbYqX3g2hzhbeSdAx-_NopPZXNj2i2o&e=> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html#configure-off-heap-memory-direct-or-native<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.10_ops_memory_mem-5Fsetup.html-23configure-2Doff-2Dheap-2Dmemory-2Ddirect-2Dor-2Dnative&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=sZ2fkdpbwJ-blzd0Ch6lj9ZeGNissEN4890aD86V0ig&s=XFgNHAAS-ex6fvNyJThfwQEnALzw0jkY-LMV5KB39yU&e=> // ah From: Caizhi Weng <tsreape...@gmail.com<mailto:tsreape...@gmail.com>> Sent: Wednesday, August 25, 2021 10:47 PM To: Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com<mailto:andreas.ha...@ny.email.gs.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: 1.9 to 1.11 Managed Memory Migration Questions Hi! Why does this ~30% memory reduction happen? I don't know how memory is calculated in Flink 1.9 but this 1.11 memory allocation result is reasonable. This is because managed memory, network memory and JVM overhead memory in 1.11 all has their default sizes or fractions (managed memory 40%, network memory 10% with 1g max, JVM overhead memory 10% with 1g max. See [1]), while heap memory doesn't. So a 5.8GB heap (about 12G - 2G - 40%) and 4.3G managed memory (about 40%) is explainable. How would you suggest discerning what properties we should have a look at? Network shuffling memory now has its own configuration key, which is taskmanager.memory.network.fraction (and ...network.min and ...network.max).Also see [1] and [2] for more keys related to task manager's memory. [1] https://github.com/apache/flink/blob/release-1.11/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_release-2D1.11_flink-2Dcore_src_main_java_org_apache_flink_configuration_TaskManagerOptions.java&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=8cDa79cdtGYTK87Mmwovq0nm7DRSnPuGXlEGR_UjZZw&s=2QfgZUSjhslppaIXyyJQSP_SgSfUtBPQJzlFaocTh4Y&e=> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_memory_mem-5Fsetup-5Ftm.html-23detailed-2Dmemory-2Dmodel&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=8cDa79cdtGYTK87Mmwovq0nm7DRSnPuGXlEGR_UjZZw&s=He5punXHksNs-7hcAETmZ7oJV8pxAqv19UKDYIOQduk&e=> Hailu, Andreas [Engineering] <andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> 于2021年8月26日周四 上午9:07写道: Hi folks, We’re about half way complete in migrating our YARN batch processing applications from Flink 1.9 to 1.11, and are currently tackling the memory configuration migrations. Our test application’s sink failed with the following exception while writing to HDFS: Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown... We submit our applications through a Flink YARN session with –ytm, -yjm etc. We don’t have any memory configurations options set aside from ‘taskmanager.network.bounded-blocking-subpartition-type: file’ which I see is now deprecated and replaced with a new option defaulted to ‘file’ (which works for us!) SO nearly everything else is as default. We haven’t made any configuration changes yet thus far as we’re still combing through the migration instructions, but I did have some questions around what I observed. 1. I observed that an application ran with “–ytm 12288” on 1.9 receives 8.47GB JVM Heap space and 5.95 Flink Managed Memory space (as reported by the ApplicationMaster), where on 1.11 it receives 5.79 JVM Heap space and 4.30 Flink Managed Memory space. Why does this ~30% memory reduction happen? 2. Piggybacking off point 1, on 1..9 we were not explicitly setting off-heap memory parameters. How would you suggest discerning what properties we should have a look at? Best, Andreas ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices> ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices> ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>