+1 to not have more options for off-heap memory, that would get confusing fast. We can keep the name "off-heap" for both task and framework, as long as they mean the same thing: native plus direct, and fully counted into MaxDirectMemory. I would suggest to update the config descriptions then to reflect that.
On Fri, Dec 20, 2019 at 1:03 PM Xintong Song <tonysong...@gmail.com> wrote: > Regarding the framework/task direct/native memory options, I tend to think > it differently. I'm in favor of keep the "*.off-heap.size" for the config > option keys. > > - It's not necessary IMO to expose the difference concepts of direct / > native memory to the users. > - I would avoid introducing more options for native memory if > possible. Taking fine grained resource management and dynamic slot > allocation into consideration, that also means introduce more fields into > ResourceSpec / ResourceProfile. > - My gut feeling is that having a relative loose MaxDirectMemory > should not be a big problem. > - In most cases, the task / framework off-heap memory should be mainly > (if not all) direct memory, so the difference between derived > MaxDirectMemory and the ideal direct memory limit should not be too > much. > - We do not have a good way to know the exact size needed for jvm > overhead / metaspace and framework / task off-heap memory, thus having > to conservatively reserve slightly more memory then what actually > needed. > Such reserved but not used memory can cover for the small > MaxDirectMemory > error. > - > - MaxDirectMemory is not the only way to trigger full gc. We still > heap activities that can also trigger the gc. > > Regarding the memory type config options, I've looked into the latest > ConfigOptions changes. I think it shouldn't be too complicated to change > the config options to use memory type, and I can handle it maybe during > your vacations. > > > Also agree with improving MemorySize logging and parsing. This should not > be a blocker that has to be done in 1.10. I would say we finish other works > (testability, documentation and those discussed in this thread) first, and > get to this only if we have time. > > > Thank you~ > > Xintong Song > > > > On Fri, Dec 20, 2019 at 6:07 PM Andrey Zagrebin < > azagrebin.apa...@gmail.com> wrote: > >> Hi Stephan and Xintong, >> >> Thanks for the further FLIP-49 feedbacks. >> >> - "taskmanager.memory.size" (old main config option) is replaced by >>> "taskmanager.memory.total-process.size" which has a different meaning in >>> standalone setups. The old option did not subtract metaspace and other >>> overhead, while the new option does. That means that with the default >>> config, standalone clusters get quite a bit less memory. (independent of >>> managed memory going off heap). >>> I am wondering if we could interpret "taskmanager.memory.size" as >>> the deprecated key for "taskmanager.memory.total-flink.size". That would be >>> in line with the old mechanism (assuming managed memory is set to off heap). >>> The effect would be that the container size on Yarn/Mesos increases, >>> because from "taskmanager.memory.total-flink.size", we need to add overhead >>> and metaspace to reach the total process size, rather than cutting off >>> memory. But if we want, we could even adjust for that in the active >>> resource manager, getting full backwards compatibility on that part. >>> Curious to hear more thoughts there. >> >> >> I believe you mean "taskmanager.heap.size". >> >> I think the problem here is that the legacy "taskmanager.heap.size" was >> used differently in standalone setups and active yarn / mesos setups, and >> such different calculation logics and behaviors are exactly what we want to >> avoid with FLIP-49. Therefore, I'm not in favor of treating >> "taskmanager.memory.total-flink.size" differently for standalone and active >> setups. >> >> I think what we really want is probably mapping "taskmanager.heap.size" >> to different new config options in different setups. How about we >> mark "taskmanager.heap.size" as deprecated key for neither of >> "taskmanager.memory.total-process.size" and >> "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly >> configured) in startup scripts / active resource managers, and set the >> value to "taskmanager.memory.total-flink.size" in the scripts and >> "taskmanager.memory.total-process.size" in active resource managers (if the >> new config options are not configured). We can provide util methods in >> TaskExecutorResourceUtils for such conversions, to keep all the >> configuration logics at one place. >> >> >> I agree that the problem is that the legacy option >> ‘taskmanager.heap.size’ has different semantics for standalone/container. >> We had it initially falling back to 'taskmanager.memory.total-flink.size’ >> but I changed that to align it with container cut-off. Now I see it changes >> standalone setup then. >> +1 for supporting its backwards compatibility differently for >> standalone/container setups. >> >> >> - Mini Cluster tries to imitate exact ratio of memory pools as a >>> standalone setup. I get the idea behind that, but I am wondering if it is >>> the right approach here. >>> For example: I started a relatively large JVM (large heap size of 10 >>> GB) as a test. With the current logic, the system tries to reserve an >>> additional 6GB for managed memory which is more than there is memory left. >>> When you see the error that no memory could be allocated, you need to >>> understand the magic of how this is derived. >>> I am trying to think about this from the perspective of using "Flink >>> as a Library", which the MiniCluster is close to. >>> When starting Flink out of a running process, we cannot assume that >>> we are the only users of that process and that we can mold the process to >>> our demands. I think a fix value for managed memory and network memory >>> would feel more natural in such a setup than a mechanism that is tailored >>> towards exclusive use of the process. >> >> >> +1 on having fixed values for managed / shuffle memory. >> >> >> also +1 for that, if user has not specified any main options to derive >> memory. We should also log this fixing of managed / shuffle memory. >> And just noticed, we could also sanity check framework and if explicitly >> configured task heap against available JVM heap, and at least log >> inconsistencies. >> >> - Some off-heap memory goes into direct memory, some does not. This >>> confused me a bit. For example "taskmanager.memory.framework.off-heap.size" >>> is counted into MaxDirectMemory while >>> "taskmanager.memory.task.off-heap.size" is counted as native memory. Maybe >>> we should rename the keys to reflect that. There is no one "off heap" >>> memory type after all. Maybe use "taskmanager.memory.task.native: XXXmb" >>> and "taskmanager.memory.framework.direct: XXXmb" instead? >> >> >> I believe "taskmanager.memory.task.off-heap.size" is also accounted in >> the max direct memory size limit. The confusion probably comes from that >> "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in >> its description while "taskmanager.memory.task.off-heap.size" didn't. >> That's actually because the framework off-heap memory is introduced later >> in a separate commit. We should fix that. >> >> For framework / task off-heap memory, we do not differentiate direct / >> native memory usage. That means the configure value for these two options >> could be a mixture of direct / native memory. Since we do not know the >> portion of direct memory out of the configured value, we have >> to conservatively account it all into the max direct memory size limit. >> >> >> *==> In that case, I am a bit confused. For the total size calculation, >> it is fine. But why do we then set MaxDirectMemory? It is a difficult >> parameter, and the main reason to set it was (if I recall correctly) to >> trigger GC based on direct memory allocation (to free heap structures that >> then in turn release direct memory). If the limit is anyways too high >> (because we also count native memory in there) such that we can exceed the >> total process (container) memory, why do we set it then?* >> >> >> I always also thought about it as providing more safety net for direct >> allocations but GC thing looks more important. >> >> +1 for fixing docs for 'taskmanager.memory.task.off-heap.size’ and >> renaming to ‘direct' as this is what really happens >> if we want to support direct limit more exact than now. >> >> I also think that it is hard to separate direct / native memory unless we >> introduce even more options. >> If user wants to keep the direct limit tight to a certain value but also >> use native memory outside of it, >> she would have to increase something else, like JVM overhead to account >> for it and there is no other better way. >> Having more options to account for the native memory outside of direct >> limit complicates things but can be introduced later if needed. >> >> - What do you think about renaming >>> "taskmanager.memory.total-flink.size" to "taskmanager.memory.flink.size" >>> and "taskmanager.memory.total-process.size" to >>> "taskmanager.memory.process.size" (or "taskmanager.memory.jvm.total"). I >>> think these keys may be a bit less clumsy (dropping the "total-") without >>> loss of expressiveness. >> >> >> +1 on this. >> >> >> +1 as well. Also an option: 'taskmanager.memory.total-process.size’ -> >> ‘taskmanager.memory.jvm.process.size’, >> although it can be also mentioned in docs that we mean JVM process. >> >> - The network memory keys are now called >>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually >>> understood as a redistribution (random, or maybe by hash of key). As an >>> example, there are many discussions about "shuffle join versus broadcast >>> join", where "shuffle" is the synonym for "re-partitioning". We use that >>> memory however for all network operations, like forward pipes, broadcasts, >>> receiver-side buffering on checkpoints, etc. I find the name "*.shuffle.*" >>> confusing, I am wondering if users would find that as well. So throwing in >>> the suggestion to call the options "taskmanager.memory.network.*". >> >> >> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the other >> hand, one can also argue that this part of memory is used by >> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more >> directly to the shuffle service components. >> >> >> *==> In that case, the name "Shuffle Environment" may be a bit incorrect, >> because it is doing not only shuffles as well. The ShuffleEnvironment is >> also more internal, so the name is not too critical. This isn't super high >> priority for me, but if we want to adjust it, better earlier than later.* >> >> >> This is also a bit controversial topic for me. Indeed, we have always >> used ’network’ for this concept of task data shuffling over the network and >> this can confuse existing users. >> >> On the other hand for the new users and in a long term, ’network’ can >> delude into a conclusion that all network memory is managed by this option. >> Also other types of shuffle might not directly deal with network at all. >> >> By calling it shuffle, we were somewhat biased by understanding it in >> term of map/reduce. This is rather an inter-task data exchange. >> Maybe then 'taskmanager.memory.shuffle.communication.*’ or >> ‘taskmanager.memory.task.shuffle/communication/io/network.*’. >> >> - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys say >>> that it also accounts for I/O direct memory, but the parameter is not >>> counted into the MaxDirectMemory parameter. >> >> >> True. Since we already have framework off-heap memory accounted for ad >> hoc direct memory usages, accounting all of jvm-overhead also into max >> direct memory limit seems not necessary. I would suggest to remove "I/O >> direct memory" from the description, and explicitly mention that this >> option does not account for direct memory and will not be accounted into >> max direct memory limit. WDYT? >> >> >> +1 >> >> - Can make the new ConfigOptions strongly typed with the new >>> configuration options. For example, directly use MemorySize typed options. >>> That makes validation automatic and saves us from breaking the options >>> later. >> >> +1. Wasn't aware of the new memory type config options. >> >> >> +1 >> >> *==> Thanks. Do you need help with adjusting this?* >> >> >> I would appreciate it. >> >> Also small side note, we extensively use MemorySize in logs now but it >> might be not always readable as its string representation is only in bytes >> atm >> and does not reduce it to kb/mb/etc in case of big bytes value. We could >> have at least some .prettyPrint function to use in logs. >> And .fromMegabytes/etc factory methods would improve code readability >> instead of .parse(int + “m”). >> >> Best, >> Andrey >> >> On 20 Dec 2019, at 12:13, Stephan Ewen <se...@apache.org> wrote: >> >> Hi Xintong! >> >> Please find my answers inline: >> >>> - "taskmanager.memory.size" (old main config option) is replaced by >>>> "taskmanager.memory.total-process.size" which has a different meaning in >>>> standalone setups. The old option did not subtract metaspace and other >>>> overhead, while the new option does. That means that with the default >>>> config, standalone clusters get quite a bit less memory. (independent of >>>> managed memory going off heap). >>>> I am wondering if we could interpret "taskmanager.memory.size" as >>>> the deprecated key for "taskmanager.memory.total-flink.size". That would be >>>> in line with the old mechanism (assuming managed memory is set to off >>>> heap). >>>> The effect would be that the container size on Yarn/Mesos >>>> increases, because from "taskmanager.memory.total-flink.size", we need to >>>> add overhead and metaspace to reach the total process size, rather than >>>> cutting off memory. But if we want, we could even adjust for that in the >>>> active resource manager, getting full backwards compatibility on that part. >>>> Curious to hear more thoughts there. >>> >>> >>> I believe you mean "taskmanager.heap.size". >>> >> >> *==> Yes* >> >> >> >>> I think the problem here is that the legacy "taskmanager.heap.size" was >>> used differently in standalone setups and active yarn / mesos setups, and >>> such different calculation logics and behaviors are exactly what we want to >>> avoid with FLIP-49. Therefore, I'm not in favor of treating >>> "taskmanager.memory.total-flink.size" differently for standalone and active >>> setups. >>> >>> I think what we really want is probably mapping "taskmanager.heap.size" >>> to different new config options in different setups. How about we >>> mark "taskmanager.heap.size" as deprecated key for neither of >>> "taskmanager.memory.total-process.size" and >>> "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly >>> configured) in startup scripts / active resource managers, and set the >>> value to "taskmanager.memory.total-flink.size" in the scripts and >>> "taskmanager.memory.total-process.size" in active resource managers (if the >>> new config options are not configured). We can provide util methods in >>> TaskExecutorResourceUtils for such conversions, to keep all the >>> configuration logics at one place. >>> >> >> *==> This is pretty much what I meant as well (maybe my description was >> not very clear), so +1 for that mechanism* >> >> >> - Mini Cluster tries to imitate exact ratio of memory pools as a >>>> standalone setup. I get the idea behind that, but I am wondering if it is >>>> the right approach here. >>>> For example: I started a relatively large JVM (large heap size of >>>> 10 GB) as a test. With the current logic, the system tries to reserve an >>>> additional 6GB for managed memory which is more than there is memory left. >>>> When you see the error that no memory could be allocated, you need to >>>> understand the magic of how this is derived. >>>> I am trying to think about this from the perspective of using >>>> "Flink as a Library", which the MiniCluster is close to. >>>> When starting Flink out of a running process, we cannot assume that >>>> we are the only users of that process and that we can mold the process to >>>> our demands. I think a fix value for managed memory and network memory >>>> would feel more natural in such a setup than a mechanism that is tailored >>>> towards exclusive use of the process. >>> >>> >>> +1 on having fixed values for managed / shuffle memory. >>> >> >> *==> Cool, let's also see what Andrey and Till think here.* >> >> >> >>> - Some off-heap memory goes into direct memory, some does not. This >>>> confused me a bit. For example "taskmanager.memory.framework.off-heap.size" >>>> is counted into MaxDirectMemory while >>>> "taskmanager.memory.task.off-heap.size" is counted as native memory. Maybe >>>> we should rename the keys to reflect that. There is no one "off heap" >>>> memory type after all. Maybe use "taskmanager.memory.task.native: XXXmb" >>>> and "taskmanager.memory.framework.direct: XXXmb" instead? >>> >>> >>> I believe "taskmanager.memory.task.off-heap.size" is also accounted in >>> the max direct memory size limit. The confusion probably comes from that >>> "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in >>> its description while "taskmanager.memory.task.off-heap.size" didn't. >>> That's actually because the framework off-heap memory is introduced later >>> in a separate commit. We should fix that. >>> >>> For framework / task off-heap memory, we do not differentiate direct / >>> native memory usage. That means the configure value for these two options >>> could be a mixture of direct / native memory. Since we do not know the >>> portion of direct memory out of the configured value, we have >>> to conservatively account it all into the max direct memory size limit. >>> >> >> *==> In that case, I am a bit confused. For the total size calculation, >> it is fine. But why do we then set MaxDirectMemory? It is a difficult >> parameter, and the main reason to set it was (if I recall correctly) to >> trigger GC based on direct memory allocation (to free heap structures that >> then in turn release direct memory). If the limit is anyways too high >> (because we also count native memory in there) such that we can exceed the >> total process (container) memory, why do we set it then?* >> >> >> - The network memory keys are now called >>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually >>>> understood as a redistribution (random, or maybe by hash of key). As an >>>> example, there are many discussions about "shuffle join versus broadcast >>>> join", where "shuffle" is the synonym for "re-partitioning". We use that >>>> memory however for all network operations, like forward pipes, broadcasts, >>>> receiver-side buffering on checkpoints, etc. I find the name "*.shuffle.*" >>>> confusing, I am wondering if users would find that as well. So throwing in >>>> the suggestion to call the options "taskmanager.memory.network.*". >>> >>> >>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the other >>> hand, one can also argue that this part of memory is used by >>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more >>> directly to the shuffle service components. >>> >> >> *==> In that case, the name "Shuffle Environment" may be a bit incorrect, >> because it is doing not only shuffles as well. The ShuffleEnvironment is >> also more internal, so the name is not too critical. This isn't super high >> priority for me, but if we want to adjust it, better earlier than later.* >> >> >> - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys >> say that it also accounts for I/O direct memory, but the parameter is not >> counted into the MaxDirectMemory parameter. >> >>> >>> True. Since we already have framework off-heap memory accounted for ad >>> hoc direct memory usages, accounting all of jvm-overhead also into max >>> direct memory limit seems not necessary. I would suggest to remove "I/O >>> direct memory" from the description, and explicitly mention that this >>> option does not account for direct memory and will not be accounted into >>> max direct memory limit. WDYT? >>> >> >> *==> Sounds good. * >> >> >> >>> - Can make the new ConfigOptions strongly typed with the new >>>> configuration options. For example, directly use MemorySize typed options. >>>> That makes validation automatic and saves us from breaking the options >>>> later. >>> >>> +1. Wasn't aware of the new memory type config options. >>> >> >> *==> Thanks. Do you need help with adjusting this?* >> >> >> Best, >> Stephan >> >> >> >>