Hi Stephan and Xintong, Thanks for the further FLIP-49 feedbacks.
> - "taskmanager.memory.size" (old main config option) is replaced by > "taskmanager.memory.total-process.size" which has a different meaning in > standalone setups. The old option did not subtract metaspace and other > overhead, while the new option does. That means that with the default config, > standalone clusters get quite a bit less memory. (independent of managed > memory going off heap). > I am wondering if we could interpret "taskmanager.memory.size" as the > deprecated key for "taskmanager.memory.total-flink.size". That would be in > line with the old mechanism (assuming managed memory is set to off heap). > The effect would be that the container size on Yarn/Mesos increases, > because from "taskmanager.memory.total-flink.size", we need to add overhead > and metaspace to reach the total process size, rather than cutting off > memory. But if we want, we could even adjust for that in the active resource > manager, getting full backwards compatibility on that part. > Curious to hear more thoughts there. > > I believe you mean "taskmanager.heap.size". > > I think the problem here is that the legacy "taskmanager.heap.size" was used > differently in standalone setups and active yarn / mesos setups, and such > different calculation logics and behaviors are exactly what we want to avoid > with FLIP-49. Therefore, I'm not in favor of treating > "taskmanager.memory.total-flink.size" differently for standalone and active > setups. > > I think what we really want is probably mapping "taskmanager.heap.size" to > different new config options in different setups. How about we mark > "taskmanager.heap.size" as deprecated key for neither of > "taskmanager.memory.total-process.size" and > "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly > configured) in startup scripts / active resource managers, and set the value > to "taskmanager.memory.total-flink.size" in the scripts and > "taskmanager.memory.total-process.size" in active resource managers (if the > new config options are not configured). We can provide util methods in > TaskExecutorResourceUtils for such conversions, to keep all the configuration > logics at one place. I agree that the problem is that the legacy option ‘taskmanager.heap.size’ has different semantics for standalone/container. We had it initially falling back to 'taskmanager.memory.total-flink.size’ but I changed that to align it with container cut-off. Now I see it changes standalone setup then. +1 for supporting its backwards compatibility differently for standalone/container setups. > > - Mini Cluster tries to imitate exact ratio of memory pools as a standalone > setup. I get the idea behind that, but I am wondering if it is the right > approach here. > For example: I started a relatively large JVM (large heap size of 10 GB) > as a test. With the current logic, the system tries to reserve an additional > 6GB for managed memory which is more than there is memory left. When you see > the error that no memory could be allocated, you need to understand the magic > of how this is derived. > I am trying to think about this from the perspective of using "Flink as a > Library", which the MiniCluster is close to. > When starting Flink out of a running process, we cannot assume that we > are the only users of that process and that we can mold the process to our > demands. I think a fix value for managed memory and network memory would feel > more natural in such a setup than a mechanism that is tailored towards > exclusive use of the process. > > +1 on having fixed values for managed / shuffle memory. also +1 for that, if user has not specified any main options to derive memory. We should also log this fixing of managed / shuffle memory. And just noticed, we could also sanity check framework and if explicitly configured task heap against available JVM heap, and at least log inconsistencies. > - Some off-heap memory goes into direct memory, some does not. This > confused me a bit. For example "taskmanager.memory.framework.off-heap.size" > is counted into MaxDirectMemory while "taskmanager.memory.task.off-heap.size" > is counted as native memory. Maybe we should rename the keys to reflect that. > There is no one "off heap" memory type after all. Maybe use > "taskmanager.memory.task.native: XXXmb" and > "taskmanager.memory.framework.direct: XXXmb" instead? > > I believe "taskmanager.memory.task.off-heap.size" is also accounted in the > max direct memory size limit. The confusion probably comes from that > "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in its > description while "taskmanager.memory.task.off-heap.size" didn't. That's > actually because the framework off-heap memory is introduced later in a > separate commit. We should fix that. > > For framework / task off-heap memory, we do not differentiate direct / native > memory usage. That means the configure value for these two options could be a > mixture of direct / native memory. Since we do not know the portion of direct > memory out of the configured value, we have to conservatively account it all > into the max direct memory size limit. > ==> In that case, I am a bit confused. For the total size calculation, it is > fine. But why do we then set MaxDirectMemory? It is a difficult parameter, > and the main reason to set it was (if I recall correctly) to trigger GC based > on direct memory allocation (to free heap structures that then in turn > release direct memory). If the limit is anyways too high (because we also > count native memory in there) such that we can exceed the total process > (container) memory, why do we set it then? I always also thought about it as providing more safety net for direct allocations but GC thing looks more important. +1 for fixing docs for 'taskmanager.memory.task.off-heap.size’ and renaming to ‘direct' as this is what really happens if we want to support direct limit more exact than now. I also think that it is hard to separate direct / native memory unless we introduce even more options. If user wants to keep the direct limit tight to a certain value but also use native memory outside of it, she would have to increase something else, like JVM overhead to account for it and there is no other better way. Having more options to account for the native memory outside of direct limit complicates things but can be introduced later if needed. > - What do you think about renaming "taskmanager.memory.total-flink.size" to > "taskmanager.memory.flink.size" and "taskmanager.memory.total-process.size" > to "taskmanager.memory.process.size" (or "taskmanager.memory.jvm.total"). I > think these keys may be a bit less clumsy (dropping the "total-") without > loss of expressiveness. > > +1 on this. +1 as well. Also an option: 'taskmanager.memory.total-process.size’ -> ‘taskmanager.memory.jvm.process.size’, although it can be also mentioned in docs that we mean JVM process. > - The network memory keys are now called "taskmanager.memory.shuffle.*". To > my knowledge, shuffle is usually understood as a redistribution (random, or > maybe by hash of key). As an example, there are many discussions about > "shuffle join versus broadcast join", where "shuffle" is the synonym for > "re-partitioning". We use that memory however for all network operations, > like forward pipes, broadcasts, receiver-side buffering on checkpoints, etc. > I find the name "*.shuffle.*" confusing, I am wondering if users would find > that as well. So throwing in the suggestion to call the options > "taskmanager.memory.network.*". > > +0 on this one. I'm ok with "taskmanager.memory.network.*". On the other > hand, one can also argue that this part of memory is used by > ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more > directly to the shuffle service components. > ==> In that case, the name "Shuffle Environment" may be a bit incorrect, > because it is doing not only shuffles as well. The ShuffleEnvironment is also > more internal, so the name is not too critical. This isn't super high > priority for me, but if we want to adjust it, better earlier than later. This is also a bit controversial topic for me. Indeed, we have always used ’network’ for this concept of task data shuffling over the network and this can confuse existing users. On the other hand for the new users and in a long term, ’network’ can delude into a conclusion that all network memory is managed by this option. Also other types of shuffle might not directly deal with network at all. By calling it shuffle, we were somewhat biased by understanding it in term of map/reduce. This is rather an inter-task data exchange. Maybe then 'taskmanager.memory.shuffle.communication.*’ or ‘taskmanager.memory.task.shuffle/communication/io/network.*’. > - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys say > that it also accounts for I/O direct memory, but the parameter is not counted > into the MaxDirectMemory parameter. > > True. Since we already have framework off-heap memory accounted for ad hoc > direct memory usages, accounting all of jvm-overhead also into max direct > memory limit seems not necessary. I would suggest to remove "I/O direct > memory" from the description, and explicitly mention that this option does > not account for direct memory and will not be accounted into max direct > memory limit. WDYT? +1 > - Can make the new ConfigOptions strongly typed with the new configuration > options. For example, directly use MemorySize typed options. That makes > validation automatic and saves us from breaking the options later. > +1. Wasn't aware of the new memory type config options. +1 > ==> Thanks. Do you need help with adjusting this? I would appreciate it. Also small side note, we extensively use MemorySize in logs now but it might be not always readable as its string representation is only in bytes atm and does not reduce it to kb/mb/etc in case of big bytes value. We could have at least some .prettyPrint function to use in logs. And .fromMegabytes/etc factory methods would improve code readability instead of .parse(int + “m”). Best, Andrey > On 20 Dec 2019, at 12:13, Stephan Ewen <se...@apache.org> wrote: > > Hi Xintong! > > Please find my answers inline: > - "taskmanager.memory.size" (old main config option) is replaced by > "taskmanager.memory.total-process.size" which has a different meaning in > standalone setups. The old option did not subtract metaspace and other > overhead, while the new option does. That means that with the default config, > standalone clusters get quite a bit less memory. (independent of managed > memory going off heap). > I am wondering if we could interpret "taskmanager.memory.size" as the > deprecated key for "taskmanager.memory.total-flink.size". That would be in > line with the old mechanism (assuming managed memory is set to off heap). > The effect would be that the container size on Yarn/Mesos increases, > because from "taskmanager.memory.total-flink.size", we need to add overhead > and metaspace to reach the total process size, rather than cutting off > memory. But if we want, we could even adjust for that in the active resource > manager, getting full backwards compatibility on that part. > Curious to hear more thoughts there. > > I believe you mean "taskmanager.heap.size". > > ==> Yes > > > I think the problem here is that the legacy "taskmanager.heap.size" was used > differently in standalone setups and active yarn / mesos setups, and such > different calculation logics and behaviors are exactly what we want to avoid > with FLIP-49. Therefore, I'm not in favor of treating > "taskmanager.memory.total-flink.size" differently for standalone and active > setups. > > I think what we really want is probably mapping "taskmanager.heap.size" to > different new config options in different setups. How about we mark > "taskmanager.heap.size" as deprecated key for neither of > "taskmanager.memory.total-process.size" and > "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly > configured) in startup scripts / active resource managers, and set the value > to "taskmanager.memory.total-flink.size" in the scripts and > "taskmanager.memory.total-process.size" in active resource managers (if the > new config options are not configured). We can provide util methods in > TaskExecutorResourceUtils for such conversions, to keep all the configuration > logics at one place. > > ==> This is pretty much what I meant as well (maybe my description was not > very clear), so +1 for that mechanism > > > - Mini Cluster tries to imitate exact ratio of memory pools as a standalone > setup. I get the idea behind that, but I am wondering if it is the right > approach here. > For example: I started a relatively large JVM (large heap size of 10 GB) > as a test. With the current logic, the system tries to reserve an additional > 6GB for managed memory which is more than there is memory left. When you see > the error that no memory could be allocated, you need to understand the magic > of how this is derived. > I am trying to think about this from the perspective of using "Flink as a > Library", which the MiniCluster is close to. > When starting Flink out of a running process, we cannot assume that we > are the only users of that process and that we can mold the process to our > demands. I think a fix value for managed memory and network memory would feel > more natural in such a setup than a mechanism that is tailored towards > exclusive use of the process. > > +1 on having fixed values for managed / shuffle memory. > > ==> Cool, let's also see what Andrey and Till think here. > > > - Some off-heap memory goes into direct memory, some does not. This > confused me a bit. For example "taskmanager.memory.framework.off-heap.size" > is counted into MaxDirectMemory while "taskmanager.memory.task.off-heap.size" > is counted as native memory. Maybe we should rename the keys to reflect that. > There is no one "off heap" memory type after all. Maybe use > "taskmanager.memory.task.native: XXXmb" and > "taskmanager.memory.framework.direct: XXXmb" instead? > > I believe "taskmanager.memory.task.off-heap.size" is also accounted in the > max direct memory size limit. The confusion probably comes from that > "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in its > description while "taskmanager.memory.task.off-heap.size" didn't. That's > actually because the framework off-heap memory is introduced later in a > separate commit. We should fix that. > > For framework / task off-heap memory, we do not differentiate direct / native > memory usage. That means the configure value for these two options could be a > mixture of direct / native memory. Since we do not know the portion of direct > memory out of the configured value, we have to conservatively account it all > into the max direct memory size limit. > > ==> In that case, I am a bit confused. For the total size calculation, it is > fine. But why do we then set MaxDirectMemory? It is a difficult parameter, > and the main reason to set it was (if I recall correctly) to trigger GC based > on direct memory allocation (to free heap structures that then in turn > release direct memory). If the limit is anyways too high (because we also > count native memory in there) such that we can exceed the total process > (container) memory, why do we set it then? > > > - The network memory keys are now called "taskmanager.memory.shuffle.*". To > my knowledge, shuffle is usually understood as a redistribution (random, or > maybe by hash of key). As an example, there are many discussions about > "shuffle join versus broadcast join", where "shuffle" is the synonym for > "re-partitioning". We use that memory however for all network operations, > like forward pipes, broadcasts, receiver-side buffering on checkpoints, etc. > I find the name "*.shuffle.*" confusing, I am wondering if users would find > that as well. So throwing in the suggestion to call the options > "taskmanager.memory.network.*". > > +0 on this one. I'm ok with "taskmanager.memory.network.*". On the other > hand, one can also argue that this part of memory is used by > ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more > directly to the shuffle service components. > > ==> In that case, the name "Shuffle Environment" may be a bit incorrect, > because it is doing not only shuffles as well. The ShuffleEnvironment is also > more internal, so the name is not too critical. This isn't super high > priority for me, but if we want to adjust it, better earlier than later. > > > - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys say > that it also accounts for I/O direct memory, but the parameter is not counted > into the MaxDirectMemory parameter. > > True. Since we already have framework off-heap memory accounted for ad hoc > direct memory usages, accounting all of jvm-overhead also into max direct > memory limit seems not necessary. I would suggest to remove "I/O direct > memory" from the description, and explicitly mention that this option does > not account for direct memory and will not be accounted into max direct > memory limit. WDYT? > > ==> Sounds good. > > > - Can make the new ConfigOptions strongly typed with the new configuration > options. For example, directly use MemorySize typed options. That makes > validation automatic and saves us from breaking the options later. > +1. Wasn't aware of the new memory type config options. > > ==> Thanks. Do you need help with adjusting this? > > > Best, > Stephan > >