How about putting "taskmanager.memory.flink.size" in the configuration? Then new downloaded Flink behaves similar to the previous Standalone setups.
If someone upgrades the binaries, but re-uses their old configuration, then they get the compatibility as discussed previously. We used that approach previously with the fine-grained failover recovery. On Mon, Dec 23, 2019 at 3:27 AM Xintong Song <tonysong...@gmail.com> wrote: > +1 to not have more options for off-heap memory, that would get confusing >> fast. We can keep the name "off-heap" for both task and framework, as long >> as they mean the same thing: native plus direct, and fully counted into >> MaxDirectMemory. I would suggest to update the config descriptions then to >> reflect that. >> > True, this should be explained in the config descriptions. > > Which configuration option will be set in Flink's default flink-conf.yaml? >> If we want to maintain the existing behaviour it would have to be the then >> deprecated taskmanager.heap.size config option. If we are ok with Yarn >> requesting slightly larger containers, then it could also >> be taskmanager.memory.total-flink.size. >> > Good point. Currently, we have "taskmanager.memory.total-process.size". In > order to preserve the previous behavior, we need to have > "taskmanager.heap.size" so it can be mapped to different new options in > standalone / active setups. > I think we can have the deprecated "taskmanager.heap.size" in the default > flink-conf.yaml, and also have the > new "taskmanager.memory.total-process.size" in a commented line. We can > explain how the deprecated config option behaves differently in the > comments, so that user can switch to the new config options if they want to. > > Thank you~ > > Xintong Song > > > > On Sat, Dec 21, 2019 at 1:00 AM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Thanks for the feedback and good discussion everyone. I left some >> comments inline. >> >> On Fri, Dec 20, 2019 at 1:59 PM Stephan Ewen <se...@apache.org> wrote: >> >>> +1 to not have more options for off-heap memory, that would get >>> confusing fast. We can keep the name "off-heap" for both task and >>> framework, as long as they mean the same thing: native plus direct, and >>> fully counted into MaxDirectMemory. I would suggest to update the config >>> descriptions then to reflect that. >>> >>> >>> >>> On Fri, Dec 20, 2019 at 1:03 PM Xintong Song <tonysong...@gmail.com> >>> wrote: >>> >>>> Regarding the framework/task direct/native memory options, I tend to >>>> think it differently. I'm in favor of keep the "*.off-heap.size" for the >>>> config option keys. >>>> >>>> - It's not necessary IMO to expose the difference concepts of >>>> direct / native memory to the users. >>>> - I would avoid introducing more options for native memory if >>>> possible. Taking fine grained resource management and dynamic slot >>>> allocation into consideration, that also means introduce more fields >>>> into >>>> ResourceSpec / ResourceProfile. >>>> - My gut feeling is that having a relative loose MaxDirectMemory >>>> should not be a big problem. >>>> - In most cases, the task / framework off-heap memory should be >>>> mainly (if not all) direct memory, so the difference between derived >>>> MaxDirectMemory and the ideal direct memory limit should not be too >>>> much. >>>> - We do not have a good way to know the exact size needed for >>>> jvm overhead / metaspace and framework / task off-heap memory, thus >>>> having >>>> to conservatively reserve slightly more memory then what actually >>>> needed. >>>> Such reserved but not used memory can cover for the small >>>> MaxDirectMemory >>>> error. >>>> - >>>> - MaxDirectMemory is not the only way to trigger full gc. We >>>> still heap activities that can also trigger the gc. >>>> >>>> Regarding the memory type config options, I've looked into the latest >>>> ConfigOptions changes. I think it shouldn't be too complicated to change >>>> the config options to use memory type, and I can handle it maybe during >>>> your vacations. >>>> >>>> >>>> Also agree with improving MemorySize logging and parsing. This should >>>> not be a blocker that has to be done in 1.10. I would say we finish other >>>> works (testability, documentation and those discussed in this thread) >>>> first, and get to this only if we have time. >>>> >>>> >>>> Thank you~ >>>> >>>> Xintong Song >>>> >>>> >>>> >>>> On Fri, Dec 20, 2019 at 6:07 PM Andrey Zagrebin < >>>> azagrebin.apa...@gmail.com> wrote: >>>> >>>>> Hi Stephan and Xintong, >>>>> >>>>> Thanks for the further FLIP-49 feedbacks. >>>>> >>>>> - "taskmanager.memory.size" (old main config option) is replaced by >>>>>> "taskmanager.memory.total-process.size" which has a different meaning in >>>>>> standalone setups. The old option did not subtract metaspace and other >>>>>> overhead, while the new option does. That means that with the default >>>>>> config, standalone clusters get quite a bit less memory. (independent of >>>>>> managed memory going off heap). >>>>>> I am wondering if we could interpret "taskmanager.memory.size" as >>>>>> the deprecated key for "taskmanager.memory.total-flink.size". That would >>>>>> be >>>>>> in line with the old mechanism (assuming managed memory is set to off >>>>>> heap). >>>>>> The effect would be that the container size on Yarn/Mesos >>>>>> increases, because from "taskmanager.memory.total-flink.size", we need to >>>>>> add overhead and metaspace to reach the total process size, rather than >>>>>> cutting off memory. But if we want, we could even adjust for that in the >>>>>> active resource manager, getting full backwards compatibility on that >>>>>> part. >>>>>> Curious to hear more thoughts there. >>>>> >>>>> >>>>> I believe you mean "taskmanager.heap.size". >>>>> >>>>> I think the problem here is that the legacy "taskmanager.heap.size" >>>>> was used differently in standalone setups and active yarn / mesos setups, >>>>> and such different calculation logics and behaviors are exactly what we >>>>> want to avoid with FLIP-49. Therefore, I'm not in favor of treating >>>>> "taskmanager.memory.total-flink.size" differently for standalone and >>>>> active >>>>> setups. >>>>> >>>>> I think what we really want is probably >>>>> mapping "taskmanager.heap.size" to different new config options in >>>>> different setups. How about we mark "taskmanager.heap.size" as deprecated >>>>> key for neither of "taskmanager.memory.total-process.size" and >>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly >>>>> configured) in startup scripts / active resource managers, and set the >>>>> value to "taskmanager.memory.total-flink.size" in the scripts and >>>>> "taskmanager.memory.total-process.size" in active resource managers (if >>>>> the >>>>> new config options are not configured). We can provide util methods in >>>>> TaskExecutorResourceUtils for such conversions, to keep all the >>>>> configuration logics at one place. >>>>> >>>>> >>>>> I agree that the problem is that the legacy option >>>>> ‘taskmanager.heap.size’ has different semantics for standalone/container. >>>>> We had it initially falling back to 'taskmanager.memory.total-flink.size’ >>>>> but I changed that to align it with container cut-off. Now I see it >>>>> changes >>>>> standalone setup then. >>>>> +1 for supporting its backwards compatibility differently for >>>>> standalone/container setups. >>>>> >>>> >> Which configuration option will be set in Flink's default >> flink-conf.yaml? If we want to maintain the existing behaviour it would >> have to be the then deprecated taskmanager.heap.size config option. If we >> are ok with Yarn requesting slightly larger containers, then it could also >> be taskmanager.memory.total-flink.size. >> >>> >>>>> >>>>> - Mini Cluster tries to imitate exact ratio of memory pools as a >>>>>> standalone setup. I get the idea behind that, but I am wondering if it is >>>>>> the right approach here. >>>>>> For example: I started a relatively large JVM (large heap size of >>>>>> 10 GB) as a test. With the current logic, the system tries to reserve an >>>>>> additional 6GB for managed memory which is more than there is memory >>>>>> left. >>>>>> When you see the error that no memory could be allocated, you need to >>>>>> understand the magic of how this is derived. >>>>>> I am trying to think about this from the perspective of using >>>>>> "Flink as a Library", which the MiniCluster is close to. >>>>>> When starting Flink out of a running process, we cannot assume >>>>>> that we are the only users of that process and that we can mold the >>>>>> process >>>>>> to our demands. I think a fix value for managed memory and network memory >>>>>> would feel more natural in such a setup than a mechanism that is tailored >>>>>> towards exclusive use of the process. >>>>> >>>>> >>>>> +1 on having fixed values for managed / shuffle memory. >>>>> >>>>> >>>>> also +1 for that, if user has not specified any main options to derive >>>>> memory. We should also log this fixing of managed / shuffle memory. >>>>> And just noticed, we could also sanity check framework and if >>>>> explicitly configured task heap against available JVM heap, and at least >>>>> log inconsistencies. >>>>> >>>>> - Some off-heap memory goes into direct memory, some does not. This >>>>>> confused me a bit. For example >>>>>> "taskmanager.memory.framework.off-heap.size" >>>>>> is counted into MaxDirectMemory while >>>>>> "taskmanager.memory.task.off-heap.size" is counted as native memory. >>>>>> Maybe >>>>>> we should rename the keys to reflect that. There is no one "off heap" >>>>>> memory type after all. Maybe use "taskmanager.memory.task.native: XXXmb" >>>>>> and "taskmanager.memory.framework.direct: XXXmb" instead? >>>>> >>>>> >>>>> I believe "taskmanager.memory.task.off-heap.size" is also accounted in >>>>> the max direct memory size limit. The confusion probably comes from that >>>>> "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in >>>>> its description while "taskmanager.memory.task.off-heap.size" didn't. >>>>> That's actually because the framework off-heap memory is introduced later >>>>> in a separate commit. We should fix that. >>>>> >>>>> For framework / task off-heap memory, we do not differentiate direct / >>>>> native memory usage. That means the configure value for these two options >>>>> could be a mixture of direct / native memory. Since we do not know the >>>>> portion of direct memory out of the configured value, we have >>>>> to conservatively account it all into the max direct memory size limit. >>>>> >>>>> >>>>> *==> In that case, I am a bit confused. For the total size >>>>> calculation, it is fine. But why do we then set MaxDirectMemory? It is a >>>>> difficult parameter, and the main reason to set it was (if I recall >>>>> correctly) to trigger GC based on direct memory allocation (to free heap >>>>> structures that then in turn release direct memory). If the limit is >>>>> anyways too high (because we also count native memory in there) such that >>>>> we can exceed the total process (container) memory, why do we set it >>>>> then?* >>>>> >>>>> >>>>> I always also thought about it as providing more safety net for direct >>>>> allocations but GC thing looks more important. >>>>> >>>>> +1 for fixing docs for 'taskmanager.memory.task.off-heap.size’ and >>>>> renaming to ‘direct' as this is what really happens >>>>> if we want to support direct limit more exact than now. >>>>> >>>>> I also think that it is hard to separate direct / native memory unless >>>>> we introduce even more options. >>>>> If user wants to keep the direct limit tight to a certain value but >>>>> also use native memory outside of it, >>>>> she would have to increase something else, like JVM overhead to >>>>> account for it and there is no other better way. >>>>> Having more options to account for the native memory outside of direct >>>>> limit complicates things but can be introduced later if needed. >>>>> >>>>> - What do you think about renaming >>>>>> "taskmanager.memory.total-flink.size" to "taskmanager.memory.flink.size" >>>>>> and "taskmanager.memory.total-process.size" to >>>>>> "taskmanager.memory.process.size" (or "taskmanager.memory.jvm.total"). I >>>>>> think these keys may be a bit less clumsy (dropping the "total-") without >>>>>> loss of expressiveness. >>>>> >>>>> >>>>> +1 on this. >>>>> >>>>> >>>>> +1 as well. Also an option: 'taskmanager.memory.total-process.size’ -> >>>>> ‘taskmanager.memory.jvm.process.size’, >>>>> although it can be also mentioned in docs that we mean JVM process. >>>>> >>>> >> I'd be in favour of Stephan's proposal for the config keys as shorter is >> usually better and they are still descriptive enough. Between >> "taskmanager.memory.process.size" and "taskmanager.memory.jvm.total" I >> would slightly favour the first variant. >> >>> >>>>> - The network memory keys are now called >>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually >>>>>> understood as a redistribution (random, or maybe by hash of key). As an >>>>>> example, there are many discussions about "shuffle join versus broadcast >>>>>> join", where "shuffle" is the synonym for "re-partitioning". We use that >>>>>> memory however for all network operations, like forward pipes, >>>>>> broadcasts, >>>>>> receiver-side buffering on checkpoints, etc. I find the name >>>>>> "*.shuffle.*" >>>>>> confusing, I am wondering if users would find that as well. So throwing >>>>>> in >>>>>> the suggestion to call the options "taskmanager.memory.network.*". >>>>> >>>>> >>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the >>>>> other hand, one can also argue that this part of memory is used by >>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more >>>>> directly to the shuffle service components. >>>>> >>>>> >>>>> *==> In that case, the name "Shuffle Environment" may be a bit >>>>> incorrect, because it is doing not only shuffles as well. The >>>>> ShuffleEnvironment is also more internal, so the name is not too critical. >>>>> This isn't super high priority for me, but if we want to adjust it, better >>>>> earlier than later.* >>>>> >>>>> >>>>> This is also a bit controversial topic for me. Indeed, we have always >>>>> used ’network’ for this concept of task data shuffling over the network >>>>> and >>>>> this can confuse existing users. >>>>> >>>>> On the other hand for the new users and in a long term, ’network’ can >>>>> delude into a conclusion that all network memory is managed by this >>>>> option. >>>>> Also other types of shuffle might not directly deal with network at >>>>> all. >>>>> >>>>> By calling it shuffle, we were somewhat biased by understanding it in >>>>> term of map/reduce. This is rather an inter-task data exchange. >>>>> Maybe then 'taskmanager.memory.shuffle.communication.*’ or >>>>> ‘taskmanager.memory.task.shuffle/communication/io/network.*’. >>>>> >>>> >> From a user's perspective I believe "taskmanager.memory.network" would >> be easier to understand as not everyone knows exactly what the shuffle >> service is. I see the point that it would be a bit imprecise as we can have >> different shuffle implementations but I would go with the ease of >> use/understanding here. Moreover, I think that we won't have many different >> shuffle service implementations in the foreseeable future. >> >>> >>>>> - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys >>>>>> say that it also accounts for I/O direct memory, but the parameter is not >>>>>> counted into the MaxDirectMemory parameter. >>>>> >>>>> >>>>> True. Since we already have framework off-heap memory accounted for ad >>>>> hoc direct memory usages, accounting all of jvm-overhead also into max >>>>> direct memory limit seems not necessary. I would suggest to remove "I/O >>>>> direct memory" from the description, and explicitly mention that this >>>>> option does not account for direct memory and will not be accounted into >>>>> max direct memory limit. WDYT? >>>>> >>>>> >>>>> +1 >>>>> >>>>> - Can make the new ConfigOptions strongly typed with the new >>>>>> configuration options. For example, directly use MemorySize typed >>>>>> options. >>>>>> That makes validation automatic and saves us from breaking the options >>>>>> later. >>>>> >>>>> +1. Wasn't aware of the new memory type config options. >>>>> >>>>> >>>>> +1 >>>>> >>>>> *==> Thanks. Do you need help with adjusting this?* >>>>> >>>>> >>>>> I would appreciate it. >>>>> >>>>> Also small side note, we extensively use MemorySize in logs now but it >>>>> might be not always readable as its string representation is only in bytes >>>>> atm >>>>> and does not reduce it to kb/mb/etc in case of big bytes value. We >>>>> could have at least some .prettyPrint function to use in logs. >>>>> And .fromMegabytes/etc factory methods would improve code readability >>>>> instead of .parse(int + “m”). >>>>> >>>>> Best, >>>>> Andrey >>>>> >>>>> On 20 Dec 2019, at 12:13, Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>> Hi Xintong! >>>>> >>>>> Please find my answers inline: >>>>> >>>>>> - "taskmanager.memory.size" (old main config option) is replaced by >>>>>>> "taskmanager.memory.total-process.size" which has a different meaning in >>>>>>> standalone setups. The old option did not subtract metaspace and other >>>>>>> overhead, while the new option does. That means that with the default >>>>>>> config, standalone clusters get quite a bit less memory. (independent of >>>>>>> managed memory going off heap). >>>>>>> I am wondering if we could interpret "taskmanager.memory.size" >>>>>>> as the deprecated key for "taskmanager.memory.total-flink.size". That >>>>>>> would >>>>>>> be in line with the old mechanism (assuming managed memory is set to off >>>>>>> heap). >>>>>>> The effect would be that the container size on Yarn/Mesos >>>>>>> increases, because from "taskmanager.memory.total-flink.size", we need >>>>>>> to >>>>>>> add overhead and metaspace to reach the total process size, rather than >>>>>>> cutting off memory. But if we want, we could even adjust for that in the >>>>>>> active resource manager, getting full backwards compatibility on that >>>>>>> part. >>>>>>> Curious to hear more thoughts there. >>>>>> >>>>>> >>>>>> I believe you mean "taskmanager.heap.size". >>>>>> >>>>> >>>>> *==> Yes* >>>>> >>>>> >>>>> >>>>>> I think the problem here is that the legacy "taskmanager.heap.size" >>>>>> was used differently in standalone setups and active yarn / mesos setups, >>>>>> and such different calculation logics and behaviors are exactly what we >>>>>> want to avoid with FLIP-49. Therefore, I'm not in favor of treating >>>>>> "taskmanager.memory.total-flink.size" differently for standalone and >>>>>> active >>>>>> setups. >>>>>> >>>>>> I think what we really want is probably >>>>>> mapping "taskmanager.heap.size" to different new config options in >>>>>> different setups. How about we mark "taskmanager.heap.size" as deprecated >>>>>> key for neither of "taskmanager.memory.total-process.size" and >>>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if >>>>>> explicitly >>>>>> configured) in startup scripts / active resource managers, and set the >>>>>> value to "taskmanager.memory.total-flink.size" in the scripts and >>>>>> "taskmanager.memory.total-process.size" in active resource managers (if >>>>>> the >>>>>> new config options are not configured). We can provide util methods in >>>>>> TaskExecutorResourceUtils for such conversions, to keep all the >>>>>> configuration logics at one place. >>>>>> >>>>> >>>>> *==> This is pretty much what I meant as well (maybe my description >>>>> was not very clear), so +1 for that mechanism* >>>>> >>>>> >>>>> - Mini Cluster tries to imitate exact ratio of memory pools as a >>>>>>> standalone setup. I get the idea behind that, but I am wondering if it >>>>>>> is >>>>>>> the right approach here. >>>>>>> For example: I started a relatively large JVM (large heap size >>>>>>> of 10 GB) as a test. With the current logic, the system tries to >>>>>>> reserve an >>>>>>> additional 6GB for managed memory which is more than there is memory >>>>>>> left. >>>>>>> When you see the error that no memory could be allocated, you need to >>>>>>> understand the magic of how this is derived. >>>>>>> I am trying to think about this from the perspective of using >>>>>>> "Flink as a Library", which the MiniCluster is close to. >>>>>>> When starting Flink out of a running process, we cannot assume >>>>>>> that we are the only users of that process and that we can mold the >>>>>>> process >>>>>>> to our demands. I think a fix value for managed memory and network >>>>>>> memory >>>>>>> would feel more natural in such a setup than a mechanism that is >>>>>>> tailored >>>>>>> towards exclusive use of the process. >>>>>> >>>>>> >>>>>> +1 on having fixed values for managed / shuffle memory. >>>>>> >>>>> >>>>> *==> Cool, let's also see what Andrey and Till think here.* >>>>> >>>>> >>>>> >>>>>> - Some off-heap memory goes into direct memory, some does not. This >>>>>>> confused me a bit. For example >>>>>>> "taskmanager.memory.framework.off-heap.size" >>>>>>> is counted into MaxDirectMemory while >>>>>>> "taskmanager.memory.task.off-heap.size" is counted as native memory. >>>>>>> Maybe >>>>>>> we should rename the keys to reflect that. There is no one "off heap" >>>>>>> memory type after all. Maybe use "taskmanager.memory.task.native: XXXmb" >>>>>>> and "taskmanager.memory.framework.direct: XXXmb" instead? >>>>>> >>>>>> >>>>>> I believe "taskmanager.memory.task.off-heap.size" is also accounted >>>>>> in the max direct memory size limit. The confusion probably comes from >>>>>> that >>>>>> "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in >>>>>> its description while "taskmanager.memory.task.off-heap.size" didn't. >>>>>> That's actually because the framework off-heap memory is introduced later >>>>>> in a separate commit. We should fix that. >>>>>> >>>>>> For framework / task off-heap memory, we do not differentiate direct >>>>>> / native memory usage. That means the configure value for these two >>>>>> options >>>>>> could be a mixture of direct / native memory. Since we do not know the >>>>>> portion of direct memory out of the configured value, we have >>>>>> to conservatively account it all into the max direct memory size limit. >>>>>> >>>>> >>>>> *==> In that case, I am a bit confused. For the total size >>>>> calculation, it is fine. But why do we then set MaxDirectMemory? It is a >>>>> difficult parameter, and the main reason to set it was (if I recall >>>>> correctly) to trigger GC based on direct memory allocation (to free heap >>>>> structures that then in turn release direct memory). If the limit is >>>>> anyways too high (because we also count native memory in there) such that >>>>> we can exceed the total process (container) memory, why do we set it >>>>> then?* >>>>> >>>>> >>>>> - The network memory keys are now called >>>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually >>>>>>> understood as a redistribution (random, or maybe by hash of key). As an >>>>>>> example, there are many discussions about "shuffle join versus broadcast >>>>>>> join", where "shuffle" is the synonym for "re-partitioning". We use that >>>>>>> memory however for all network operations, like forward pipes, >>>>>>> broadcasts, >>>>>>> receiver-side buffering on checkpoints, etc. I find the name >>>>>>> "*.shuffle.*" >>>>>>> confusing, I am wondering if users would find that as well. So throwing >>>>>>> in >>>>>>> the suggestion to call the options "taskmanager.memory.network.*". >>>>>> >>>>>> >>>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the >>>>>> other hand, one can also argue that this part of memory is used by >>>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points >>>>>> more >>>>>> directly to the shuffle service components. >>>>>> >>>>> >>>>> *==> In that case, the name "Shuffle Environment" may be a bit >>>>> incorrect, because it is doing not only shuffles as well. The >>>>> ShuffleEnvironment is also more internal, so the name is not too critical. >>>>> This isn't super high priority for me, but if we want to adjust it, better >>>>> earlier than later.* >>>>> >>>>> >>>>> - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys >>>>> say that it also accounts for I/O direct memory, but the parameter is not >>>>> counted into the MaxDirectMemory parameter. >>>>> >>>>>> >>>>>> True. Since we already have framework off-heap memory accounted for >>>>>> ad hoc direct memory usages, accounting all of jvm-overhead also into max >>>>>> direct memory limit seems not necessary. I would suggest to remove "I/O >>>>>> direct memory" from the description, and explicitly mention that this >>>>>> option does not account for direct memory and will not be accounted into >>>>>> max direct memory limit. WDYT? >>>>>> >>>>> >>>>> *==> Sounds good. * >>>>> >>>>> >>>>> >>>>>> - Can make the new ConfigOptions strongly typed with the new >>>>>>> configuration options. For example, directly use MemorySize typed >>>>>>> options. >>>>>>> That makes validation automatic and saves us from breaking the options >>>>>>> later. >>>>>> >>>>>> +1. Wasn't aware of the new memory type config options. >>>>>> >>>>> >>>>> *==> Thanks. Do you need help with adjusting this?* >>>>> >>>>> >>>>> Best, >>>>> Stephan >>>>> >>>>> >>>>> >>>>>