Thanks for the summary, Xintong! It makes sense to me. How about putting "taskmanager.memory.flink.size" in the configuration? > Then new downloaded Flink behaves similar to the previous Standalone setups. > If someone upgrades the binaries, but re-uses their old configuration, > then they get the compatibility as discussed previously. > We used that approach previously with the fine-grained failover recovery.
> I'm trying to understand why "taskmanager.memory.flink.size" rather than > "taskmanager.memory.process.size" in the default flink-conf.yaml. Or put it > another way, why do we want the new downloaded Flink behaves similar to > previous Standalone setups rather than previous active mode setups? Is > there any special reason that I overlooked, which makes backwards > compatibility for standalone setups more important than for active setups? > IMO, "taskmanager.memory.process.size" is easier for the new comers. For > standalone setups, users can simply configure it to their machines' > available memory size, without needing to worry about leaving enough space > for JVM overehead / metaspace. For containerized setups, it's more > predictable how many memory the containers / Flink could use, which is more > friendly for users to manage their resource quota. > Therefore, unless there is anything I overlooked, I'm in favor of putting > "taskmanager.memory.process.size" rather than > "taskmanager.memory.flink.size" in the default configuration. I agree that having "taskmanager.memory.process.size" in default config should be easier to understand and tweak for the new users because it is just what they are ready to spend for Flink application. The problem is when users upgrade Flink and use the new default configuration then the behaviour can change: - either if we put process memory then Flink memory shrinks and the new default option contradicts their previous understanding - or if we put Flink memory then larger container is requested. The shrinking of memory sounds more implicit and worse. The increased container request will just fail in the worst case and the memory setup can be revisited. We could increase the default "taskmanager.memory.process.size" to better align it with the previous default setup for standalone but this would not remove the possible confusion problem for the old users, on the other hand the option is new and we can add a comment how to migrate from the old one. All in all, now I also tend to have "taskmanager.memory.process.size" in the default config unless there are more reasons for having less confusion for the old standalone users. Best, Andrey On Tue, Dec 24, 2019 at 5:50 AM Xintong Song <tonysong...@gmail.com> wrote: > Hi all, > > It seems we have already get consensus on most of the issues. Thanks > everyone for the good discussion. > > While there are still open questions under discussion, I'd like to > summarize the discussion so far, and list the action items that we already > get consensus on. In this way, we can already start working on these items > while discussing the remaining open questions. Please correct me if the > summary does not reflect your argument. > > *Action Items that we already get consensus on:* > > - Map "taskmanager.heap.size" to "taskmanager.memory.flink.size" for > standalone setups, and "taskmanager.memory.process.size" for active setups. > (should be mentioned in release notes) > - If not explicitly configured, MiniCluster should have fixed default > network and managed memory sizes. (should be mentioned in docs & release > notes) > - Change the memory config options' type from String to MemorySize > - Change the config option key "taskmanager.memory.total-flink.size" > to "taskmanager.memory.flink.size" > - Change the config option key "taskmanager.memory.total-process.size" > to "taskmanager.memory.process.size" > - Update descriptions for "taskmanager.memory.framework.off-heap.size" > and "taskmanager.memory.task.off-heap.size" to explicitly state that > - Both direct and native memory are accounted > - Will be fully counted into MaxDirectMemorySize > - Update descriptions for > "taskmanager.memory.jvm-overhead.[min|max|fraction]" to remove "I/O direct > memory" and explicitly state that it's not counted into MaxDirectMemorySize > - Print MemorySize with proper unit. (non-blocker for 1.10) > > > *Questions that are still open:* > > - Which config option do we put in the default flink-conf.yaml? > - "taskmanager.memory.flink.size" > - "taskmanager.memory.process.size" > - Deprecated "taskmanager.heap.size" > - What is proper keys for network / shuffle memory > - "taskmanager.memory.shuffle.*" > - "taskmanager.memory.network.*" > - "taskmanager.network.memory.*" > > > Thank you~ > > Xintong Song > > > > On Tue, Dec 24, 2019 at 10:19 AM Xintong Song <tonysong...@gmail.com> > wrote: > >> How about putting "taskmanager.memory.flink.size" in the configuration? >>> Then new downloaded Flink behaves similar to the previous Standalone setups. >>> If someone upgrades the binaries, but re-uses their old configuration, >>> then they get the compatibility as discussed previously. >>> We used that approach previously with the fine-grained failover recovery. >> >> >> I'm trying to understand why "taskmanager.memory.flink.size" rather than >> "taskmanager.memory.process.size" in the default flink-conf.yaml. Or put it >> another way, why do we want the new downloaded Flink behaves similar to >> previous Standalone setups rather than previous active mode setups? Is >> there any special reason that I overlooked, which makes backwards >> compatibility for standalone setups more important than for active setups? >> >> IMO, "taskmanager.memory.process.size" is easier for the new comers. For >> standalone setups, users can simply configure it to their machines' >> available memory size, without needing to worry about leaving enough space >> for JVM overehead / metaspace. For containerized setups, it's more >> predictable how many memory the containers / Flink could use, which is more >> friendly for users to manage their resource quota. >> >> Therefore, unless there is anything I overlooked, I'm in favor of putting >> "taskmanager.memory.process.size" rather than >> "taskmanager.memory.flink.size" in the default configuration. >> >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Tue, Dec 24, 2019 at 4:27 AM Andrey Zagrebin <azagre...@apache.org> >> wrote: >> >>> How about putting "taskmanager.memory.flink.size" in the configuration? >>>> Then new downloaded Flink behaves similar to the previous Standalone >>>> setups. >>>> If someone upgrades the binaries, but re-uses their old configuration, >>>> then they get the compatibility as discussed previously. >>>> We used that approach previously with the fine-grained failover >>>> recovery. >>> >>> +1, this sounds like a good compromise. >>> >>> +1 to not have more options for off-heap memory, that would get >>>> confusing fast. We can keep the name "off-heap" for both task and >>>> framework, as long as they mean the same thing: native plus direct, and >>>> fully counted into MaxDirectMemory. I would suggest to update the config >>>> descriptions then to reflect that. >>>> >>> True, this should be explained in the config descriptions. >>> >>> looks good to me >>> >>> From a user's perspective I believe "taskmanager.memory.network" would >>>> be easier to understand as not everyone knows exactly what the shuffle >>>> service is. I see the point that it would be a bit imprecise as we can have >>>> different shuffle implementations but I would go with the ease of >>>> use/understanding here. Moreover, I think that we won't have many different >>>> shuffle service implementations in the foreseeable future. >>> >>> I agree that if we cannot find any other convincing names for the >>> options, we should keep what we already have and change it if the >>> alternative is convincing enough. >>> The question is also whether we still want to rename it because it was >>> "taskmanager.network.*memory*.*" in 1.9 but "taskmanager.*memory*.network.*" >>> is more aligned with other new memory option names. >>> Or we can just 'un'-deprecate "taskmanager.network.*memory*.*". >>> >>> On Mon, Dec 23, 2019 at 8:42 PM Stephan Ewen <se...@apache.org> wrote: >>> >>>> How about putting "taskmanager.memory.flink.size" in the configuration? >>>> Then new downloaded Flink behaves similar to the previous Standalone >>>> setups. >>>> >>>> If someone upgrades the binaries, but re-uses their old configuration, >>>> then they get the compatibility as discussed previously. >>>> We used that approach previously with the fine-grained failover >>>> recovery. >>>> >>>> On Mon, Dec 23, 2019 at 3:27 AM Xintong Song <tonysong...@gmail.com> >>>> wrote: >>>> >>>>> +1 to not have more options for off-heap memory, that would get >>>>>> confusing fast. We can keep the name "off-heap" for both task and >>>>>> framework, as long as they mean the same thing: native plus direct, and >>>>>> fully counted into MaxDirectMemory. I would suggest to update the config >>>>>> descriptions then to reflect that. >>>>>> >>>>> True, this should be explained in the config descriptions. >>>>> >>>>> Which configuration option will be set in Flink's default >>>>>> flink-conf.yaml? If we want to maintain the existing behaviour it would >>>>>> have to be the then deprecated taskmanager.heap.size config option. If we >>>>>> are ok with Yarn requesting slightly larger containers, then it could >>>>>> also >>>>>> be taskmanager.memory.total-flink.size. >>>>>> >>>>> Good point. Currently, we have >>>>> "taskmanager.memory.total-process.size". In order to preserve the previous >>>>> behavior, we need to have "taskmanager.heap.size" so it can be mapped to >>>>> different new options in standalone / active setups. >>>>> I think we can have the deprecated "taskmanager.heap.size" in the >>>>> default flink-conf.yaml, and also have the >>>>> new "taskmanager.memory.total-process.size" in a commented line. We can >>>>> explain how the deprecated config option behaves differently in the >>>>> comments, so that user can switch to the new config options if they want >>>>> to. >>>>> >>>>> Thank you~ >>>>> >>>>> Xintong Song >>>>> >>>>> >>>>> >>>>> On Sat, Dec 21, 2019 at 1:00 AM Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Thanks for the feedback and good discussion everyone. I left some >>>>>> comments inline. >>>>>> >>>>>> On Fri, Dec 20, 2019 at 1:59 PM Stephan Ewen <se...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> +1 to not have more options for off-heap memory, that would get >>>>>>> confusing fast. We can keep the name "off-heap" for both task and >>>>>>> framework, as long as they mean the same thing: native plus direct, and >>>>>>> fully counted into MaxDirectMemory. I would suggest to update the config >>>>>>> descriptions then to reflect that. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Dec 20, 2019 at 1:03 PM Xintong Song <tonysong...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Regarding the framework/task direct/native memory options, I tend >>>>>>>> to think it differently. I'm in favor of keep the "*.off-heap.size" >>>>>>>> for the >>>>>>>> config option keys. >>>>>>>> >>>>>>>> - It's not necessary IMO to expose the difference concepts of >>>>>>>> direct / native memory to the users. >>>>>>>> - I would avoid introducing more options for native memory if >>>>>>>> possible. Taking fine grained resource management and dynamic slot >>>>>>>> allocation into consideration, that also means introduce more >>>>>>>> fields into >>>>>>>> ResourceSpec / ResourceProfile. >>>>>>>> - My gut feeling is that having a relative loose >>>>>>>> MaxDirectMemory should not be a big problem. >>>>>>>> - In most cases, the task / framework off-heap memory should be >>>>>>>> mainly (if not all) direct memory, so the difference between >>>>>>>> derived >>>>>>>> MaxDirectMemory and the ideal direct memory limit should not be >>>>>>>> too much. >>>>>>>> - We do not have a good way to know the exact size needed >>>>>>>> for jvm overhead / metaspace and framework / task off-heap >>>>>>>> memory, thus >>>>>>>> having to conservatively reserve slightly more memory then what >>>>>>>> actually >>>>>>>> needed. Such reserved but not used memory can cover for the small >>>>>>>> MaxDirectMemory error. >>>>>>>> - >>>>>>>> - MaxDirectMemory is not the only way to trigger full gc. We >>>>>>>> still heap activities that can also trigger the gc. >>>>>>>> >>>>>>>> Regarding the memory type config options, I've looked into the >>>>>>>> latest ConfigOptions changes. I think it shouldn't be too complicated >>>>>>>> to >>>>>>>> change the config options to use memory type, and I can handle it maybe >>>>>>>> during your vacations. >>>>>>>> >>>>>>>> >>>>>>>> Also agree with improving MemorySize logging and parsing. This >>>>>>>> should not be a blocker that has to be done in 1.10. I would say we >>>>>>>> finish >>>>>>>> other works (testability, documentation and those discussed in this >>>>>>>> thread) >>>>>>>> first, and get to this only if we have time. >>>>>>>> >>>>>>>> >>>>>>>> Thank you~ >>>>>>>> >>>>>>>> Xintong Song >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Dec 20, 2019 at 6:07 PM Andrey Zagrebin < >>>>>>>> azagrebin.apa...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Stephan and Xintong, >>>>>>>>> >>>>>>>>> Thanks for the further FLIP-49 feedbacks. >>>>>>>>> >>>>>>>>> - "taskmanager.memory.size" (old main config option) is replaced >>>>>>>>>> by "taskmanager.memory.total-process.size" which has a different >>>>>>>>>> meaning in >>>>>>>>>> standalone setups. The old option did not subtract metaspace and >>>>>>>>>> other >>>>>>>>>> overhead, while the new option does. That means that with the default >>>>>>>>>> config, standalone clusters get quite a bit less memory. >>>>>>>>>> (independent of >>>>>>>>>> managed memory going off heap). >>>>>>>>>> I am wondering if we could interpret >>>>>>>>>> "taskmanager.memory.size" as the deprecated key for >>>>>>>>>> "taskmanager.memory.total-flink.size". That would be in line with >>>>>>>>>> the old >>>>>>>>>> mechanism (assuming managed memory is set to off heap). >>>>>>>>>> The effect would be that the container size on Yarn/Mesos >>>>>>>>>> increases, because from "taskmanager.memory.total-flink.size", we >>>>>>>>>> need to >>>>>>>>>> add overhead and metaspace to reach the total process size, rather >>>>>>>>>> than >>>>>>>>>> cutting off memory. But if we want, we could even adjust for that in >>>>>>>>>> the >>>>>>>>>> active resource manager, getting full backwards compatibility on >>>>>>>>>> that part. >>>>>>>>>> Curious to hear more thoughts there. >>>>>>>>> >>>>>>>>> >>>>>>>>> I believe you mean "taskmanager.heap.size". >>>>>>>>> >>>>>>>>> I think the problem here is that the >>>>>>>>> legacy "taskmanager.heap.size" was used differently in standalone >>>>>>>>> setups >>>>>>>>> and active yarn / mesos setups, and such different calculation logics >>>>>>>>> and >>>>>>>>> behaviors are exactly what we want to avoid with FLIP-49. Therefore, >>>>>>>>> I'm >>>>>>>>> not in favor of treating "taskmanager.memory.total-flink.size" >>>>>>>>> differently >>>>>>>>> for standalone and active setups. >>>>>>>>> >>>>>>>>> I think what we really want is probably >>>>>>>>> mapping "taskmanager.heap.size" to different new config options in >>>>>>>>> different setups. How about we mark "taskmanager.heap.size" as >>>>>>>>> deprecated >>>>>>>>> key for neither of "taskmanager.memory.total-process.size" and >>>>>>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if >>>>>>>>> explicitly >>>>>>>>> configured) in startup scripts / active resource managers, and set the >>>>>>>>> value to "taskmanager.memory.total-flink.size" in the scripts and >>>>>>>>> "taskmanager.memory.total-process.size" in active resource managers >>>>>>>>> (if the >>>>>>>>> new config options are not configured). We can provide util methods in >>>>>>>>> TaskExecutorResourceUtils for such conversions, to keep all the >>>>>>>>> configuration logics at one place. >>>>>>>>> >>>>>>>>> >>>>>>>>> I agree that the problem is that the legacy option >>>>>>>>> ‘taskmanager.heap.size’ has different semantics for >>>>>>>>> standalone/container. >>>>>>>>> We had it initially falling back to >>>>>>>>> 'taskmanager.memory.total-flink.size’ >>>>>>>>> but I changed that to align it with container cut-off. Now I see it >>>>>>>>> changes >>>>>>>>> standalone setup then. >>>>>>>>> +1 for supporting its backwards compatibility differently for >>>>>>>>> standalone/container setups. >>>>>>>>> >>>>>>>> >>>>>> Which configuration option will be set in Flink's default >>>>>> flink-conf.yaml? If we want to maintain the existing behaviour it would >>>>>> have to be the then deprecated taskmanager.heap.size config option. If we >>>>>> are ok with Yarn requesting slightly larger containers, then it could >>>>>> also >>>>>> be taskmanager.memory.total-flink.size. >>>>>> >>>>>>> >>>>>>>>> >>>>>>>>> - Mini Cluster tries to imitate exact ratio of memory pools as a >>>>>>>>>> standalone setup. I get the idea behind that, but I am wondering if >>>>>>>>>> it is >>>>>>>>>> the right approach here. >>>>>>>>>> For example: I started a relatively large JVM (large heap >>>>>>>>>> size of 10 GB) as a test. With the current logic, the system tries to >>>>>>>>>> reserve an additional 6GB for managed memory which is more than >>>>>>>>>> there is >>>>>>>>>> memory left. When you see the error that no memory could be >>>>>>>>>> allocated, you >>>>>>>>>> need to understand the magic of how this is derived. >>>>>>>>>> I am trying to think about this from the perspective of using >>>>>>>>>> "Flink as a Library", which the MiniCluster is close to. >>>>>>>>>> When starting Flink out of a running process, we cannot >>>>>>>>>> assume that we are the only users of that process and that we can >>>>>>>>>> mold the >>>>>>>>>> process to our demands. I think a fix value for managed memory and >>>>>>>>>> network >>>>>>>>>> memory would feel more natural in such a setup than a mechanism that >>>>>>>>>> is >>>>>>>>>> tailored towards exclusive use of the process. >>>>>>>>> >>>>>>>>> >>>>>>>>> +1 on having fixed values for managed / shuffle memory. >>>>>>>>> >>>>>>>>> >>>>>>>>> also +1 for that, if user has not specified any main options to >>>>>>>>> derive memory. We should also log this fixing of managed / shuffle >>>>>>>>> memory. >>>>>>>>> And just noticed, we could also sanity check framework and if >>>>>>>>> explicitly configured task heap against available JVM heap, and at >>>>>>>>> least >>>>>>>>> log inconsistencies. >>>>>>>>> >>>>>>>>> - Some off-heap memory goes into direct memory, some does not. >>>>>>>>>> This confused me a bit. For example >>>>>>>>>> "taskmanager.memory.framework.off-heap.size" is counted into >>>>>>>>>> MaxDirectMemory while "taskmanager.memory.task.off-heap.size" is >>>>>>>>>> counted as >>>>>>>>>> native memory. Maybe we should rename the keys to reflect that. >>>>>>>>>> There is no >>>>>>>>>> one "off heap" memory type after all. Maybe use >>>>>>>>>> "taskmanager.memory.task.native: XXXmb" and >>>>>>>>>> "taskmanager.memory.framework.direct: XXXmb" instead? >>>>>>>>> >>>>>>>>> >>>>>>>>> I believe "taskmanager.memory.task.off-heap.size" is also >>>>>>>>> accounted in the max direct memory size limit. The confusion probably >>>>>>>>> comes >>>>>>>>> from that "taskmanager.memory.framework.off-heap.size" explicitly >>>>>>>>> mentioned >>>>>>>>> that in its description while "taskmanager.memory.task.off-heap.size" >>>>>>>>> didn't. That's actually because the framework off-heap memory is >>>>>>>>> introduced >>>>>>>>> later in a separate commit. We should fix that. >>>>>>>>> >>>>>>>>> For framework / task off-heap memory, we do not differentiate >>>>>>>>> direct / native memory usage. That means the configure value for >>>>>>>>> these two >>>>>>>>> options could be a mixture of direct / native memory. Since we do not >>>>>>>>> know >>>>>>>>> the portion of direct memory out of the configured value, we have >>>>>>>>> to conservatively account it all into the max direct memory size >>>>>>>>> limit. >>>>>>>>> >>>>>>>>> >>>>>>>>> *==> In that case, I am a bit confused. For the total size >>>>>>>>> calculation, it is fine. But why do we then set MaxDirectMemory? It >>>>>>>>> is a >>>>>>>>> difficult parameter, and the main reason to set it was (if I recall >>>>>>>>> correctly) to trigger GC based on direct memory allocation (to free >>>>>>>>> heap >>>>>>>>> structures that then in turn release direct memory). If the limit is >>>>>>>>> anyways too high (because we also count native memory in there) such >>>>>>>>> that >>>>>>>>> we can exceed the total process (container) memory, why do we set it >>>>>>>>> then?* >>>>>>>>> >>>>>>>>> >>>>>>>>> I always also thought about it as providing more safety net for >>>>>>>>> direct allocations but GC thing looks more important. >>>>>>>>> >>>>>>>>> +1 for fixing docs for 'taskmanager.memory.task.off-heap.size’ and >>>>>>>>> renaming to ‘direct' as this is what really happens >>>>>>>>> if we want to support direct limit more exact than now. >>>>>>>>> >>>>>>>>> I also think that it is hard to separate direct / native memory >>>>>>>>> unless we introduce even more options. >>>>>>>>> If user wants to keep the direct limit tight to a certain value >>>>>>>>> but also use native memory outside of it, >>>>>>>>> she would have to increase something else, like JVM overhead to >>>>>>>>> account for it and there is no other better way. >>>>>>>>> Having more options to account for the native memory outside of >>>>>>>>> direct limit complicates things but can be introduced later if needed. >>>>>>>>> >>>>>>>>> - What do you think about renaming >>>>>>>>>> "taskmanager.memory.total-flink.size" to >>>>>>>>>> "taskmanager.memory.flink.size" >>>>>>>>>> and "taskmanager.memory.total-process.size" to >>>>>>>>>> "taskmanager.memory.process.size" (or >>>>>>>>>> "taskmanager.memory.jvm.total"). I >>>>>>>>>> think these keys may be a bit less clumsy (dropping the "total-") >>>>>>>>>> without >>>>>>>>>> loss of expressiveness. >>>>>>>>> >>>>>>>>> >>>>>>>>> +1 on this. >>>>>>>>> >>>>>>>>> >>>>>>>>> +1 as well. Also an option: >>>>>>>>> 'taskmanager.memory.total-process.size’ -> >>>>>>>>> ‘taskmanager.memory.jvm.process.size’, >>>>>>>>> although it can be also mentioned in docs that we mean JVM process. >>>>>>>>> >>>>>>>> >>>>>> I'd be in favour of Stephan's proposal for the config keys as shorter >>>>>> is usually better and they are still descriptive enough. Between >>>>>> "taskmanager.memory.process.size" and "taskmanager.memory.jvm.total" I >>>>>> would slightly favour the first variant. >>>>>> >>>>>>> >>>>>>>>> - The network memory keys are now called >>>>>>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually >>>>>>>>>> understood as a redistribution (random, or maybe by hash of key). As >>>>>>>>>> an >>>>>>>>>> example, there are many discussions about "shuffle join versus >>>>>>>>>> broadcast >>>>>>>>>> join", where "shuffle" is the synonym for "re-partitioning". We use >>>>>>>>>> that >>>>>>>>>> memory however for all network operations, like forward pipes, >>>>>>>>>> broadcasts, >>>>>>>>>> receiver-side buffering on checkpoints, etc. I find the name >>>>>>>>>> "*.shuffle.*" >>>>>>>>>> confusing, I am wondering if users would find that as well. So >>>>>>>>>> throwing in >>>>>>>>>> the suggestion to call the options "taskmanager.memory.network.*". >>>>>>>>> >>>>>>>>> >>>>>>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the >>>>>>>>> other hand, one can also argue that this part of memory is used by >>>>>>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points >>>>>>>>> more >>>>>>>>> directly to the shuffle service components. >>>>>>>>> >>>>>>>>> >>>>>>>>> *==> In that case, the name "Shuffle Environment" may be a bit >>>>>>>>> incorrect, because it is doing not only shuffles as well. The >>>>>>>>> ShuffleEnvironment is also more internal, so the name is not too >>>>>>>>> critical. >>>>>>>>> This isn't super high priority for me, but if we want to adjust it, >>>>>>>>> better >>>>>>>>> earlier than later.* >>>>>>>>> >>>>>>>>> >>>>>>>>> This is also a bit controversial topic for me. Indeed, we have >>>>>>>>> always used ’network’ for this concept of task data shuffling over the >>>>>>>>> network and this can confuse existing users. >>>>>>>>> >>>>>>>>> On the other hand for the new users and in a long term, ’network’ >>>>>>>>> can delude into a conclusion that all network memory is managed by >>>>>>>>> this >>>>>>>>> option. >>>>>>>>> Also other types of shuffle might not directly deal with network >>>>>>>>> at all. >>>>>>>>> >>>>>>>>> By calling it shuffle, we were somewhat biased by understanding it >>>>>>>>> in term of map/reduce. This is rather an inter-task data exchange. >>>>>>>>> Maybe then 'taskmanager.memory.shuffle.communication.*’ or >>>>>>>>> ‘taskmanager.memory.task.shuffle/communication/io/network.*’. >>>>>>>>> >>>>>>>> >>>>>> From a user's perspective I believe "taskmanager.memory.network" >>>>>> would be easier to understand as not everyone knows exactly what the >>>>>> shuffle service is. I see the point that it would be a bit imprecise as >>>>>> we >>>>>> can have different shuffle implementations but I would go with the ease >>>>>> of >>>>>> use/understanding here. Moreover, I think that we won't have many >>>>>> different >>>>>> shuffle service implementations in the foreseeable future. >>>>>> >>>>>>> >>>>>>>>> - The descriptions for the "taskmanager.memory.jvm-overhead.*" >>>>>>>>>> keys say that it also accounts for I/O direct memory, but the >>>>>>>>>> parameter is >>>>>>>>>> not counted into the MaxDirectMemory parameter. >>>>>>>>> >>>>>>>>> >>>>>>>>> True. Since we already have framework off-heap memory accounted >>>>>>>>> for ad hoc direct memory usages, accounting all of jvm-overhead also >>>>>>>>> into >>>>>>>>> max direct memory limit seems not necessary. I would suggest to >>>>>>>>> remove "I/O >>>>>>>>> direct memory" from the description, and explicitly mention that this >>>>>>>>> option does not account for direct memory and will not be accounted >>>>>>>>> into >>>>>>>>> max direct memory limit. WDYT? >>>>>>>>> >>>>>>>>> >>>>>>>>> +1 >>>>>>>>> >>>>>>>>> - Can make the new ConfigOptions strongly typed with the new >>>>>>>>>> configuration options. For example, directly use MemorySize typed >>>>>>>>>> options. >>>>>>>>>> That makes validation automatic and saves us from breaking the >>>>>>>>>> options >>>>>>>>>> later. >>>>>>>>> >>>>>>>>> +1. Wasn't aware of the new memory type config options. >>>>>>>>> >>>>>>>>> >>>>>>>>> +1 >>>>>>>>> >>>>>>>>> *==> Thanks. Do you need help with adjusting this?* >>>>>>>>> >>>>>>>>> >>>>>>>>> I would appreciate it. >>>>>>>>> >>>>>>>>> Also small side note, we extensively use MemorySize in logs now >>>>>>>>> but it might be not always readable as its string representation is >>>>>>>>> only in >>>>>>>>> bytes atm >>>>>>>>> and does not reduce it to kb/mb/etc in case of big bytes value. We >>>>>>>>> could have at least some .prettyPrint function to use in logs. >>>>>>>>> And .fromMegabytes/etc factory methods would improve code >>>>>>>>> readability instead of .parse(int + “m”). >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Andrey >>>>>>>>> >>>>>>>>> On 20 Dec 2019, at 12:13, Stephan Ewen <se...@apache.org> wrote: >>>>>>>>> >>>>>>>>> Hi Xintong! >>>>>>>>> >>>>>>>>> Please find my answers inline: >>>>>>>>> >>>>>>>>>> - "taskmanager.memory.size" (old main config option) is >>>>>>>>>>> replaced by "taskmanager.memory.total-process.size" which has a >>>>>>>>>>> different >>>>>>>>>>> meaning in standalone setups. The old option did not subtract >>>>>>>>>>> metaspace and >>>>>>>>>>> other overhead, while the new option does. That means that with the >>>>>>>>>>> default >>>>>>>>>>> config, standalone clusters get quite a bit less memory. >>>>>>>>>>> (independent of >>>>>>>>>>> managed memory going off heap). >>>>>>>>>>> I am wondering if we could interpret >>>>>>>>>>> "taskmanager.memory.size" as the deprecated key for >>>>>>>>>>> "taskmanager.memory.total-flink.size". That would be in line with >>>>>>>>>>> the old >>>>>>>>>>> mechanism (assuming managed memory is set to off heap). >>>>>>>>>>> The effect would be that the container size on Yarn/Mesos >>>>>>>>>>> increases, because from "taskmanager.memory.total-flink.size", we >>>>>>>>>>> need to >>>>>>>>>>> add overhead and metaspace to reach the total process size, rather >>>>>>>>>>> than >>>>>>>>>>> cutting off memory. But if we want, we could even adjust for that >>>>>>>>>>> in the >>>>>>>>>>> active resource manager, getting full backwards compatibility on >>>>>>>>>>> that part. >>>>>>>>>>> Curious to hear more thoughts there. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I believe you mean "taskmanager.heap.size". >>>>>>>>>> >>>>>>>>> >>>>>>>>> *==> Yes* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> I think the problem here is that the >>>>>>>>>> legacy "taskmanager.heap.size" was used differently in standalone >>>>>>>>>> setups >>>>>>>>>> and active yarn / mesos setups, and such different calculation >>>>>>>>>> logics and >>>>>>>>>> behaviors are exactly what we want to avoid with FLIP-49. Therefore, >>>>>>>>>> I'm >>>>>>>>>> not in favor of treating "taskmanager.memory.total-flink.size" >>>>>>>>>> differently >>>>>>>>>> for standalone and active setups. >>>>>>>>>> >>>>>>>>>> I think what we really want is probably >>>>>>>>>> mapping "taskmanager.heap.size" to different new config options in >>>>>>>>>> different setups. How about we mark "taskmanager.heap.size" as >>>>>>>>>> deprecated >>>>>>>>>> key for neither of "taskmanager.memory.total-process.size" and >>>>>>>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if >>>>>>>>>> explicitly >>>>>>>>>> configured) in startup scripts / active resource managers, and set >>>>>>>>>> the >>>>>>>>>> value to "taskmanager.memory.total-flink.size" in the scripts and >>>>>>>>>> "taskmanager.memory.total-process.size" in active resource managers >>>>>>>>>> (if the >>>>>>>>>> new config options are not configured). We can provide util methods >>>>>>>>>> in >>>>>>>>>> TaskExecutorResourceUtils for such conversions, to keep all the >>>>>>>>>> configuration logics at one place. >>>>>>>>>> >>>>>>>>> >>>>>>>>> *==> This is pretty much what I meant as well (maybe my >>>>>>>>> description was not very clear), so +1 for that mechanism* >>>>>>>>> >>>>>>>>> >>>>>>>>> - Mini Cluster tries to imitate exact ratio of memory pools as a >>>>>>>>>>> standalone setup. I get the idea behind that, but I am wondering if >>>>>>>>>>> it is >>>>>>>>>>> the right approach here. >>>>>>>>>>> For example: I started a relatively large JVM (large heap >>>>>>>>>>> size of 10 GB) as a test. With the current logic, the system tries >>>>>>>>>>> to >>>>>>>>>>> reserve an additional 6GB for managed memory which is more than >>>>>>>>>>> there is >>>>>>>>>>> memory left. When you see the error that no memory could be >>>>>>>>>>> allocated, you >>>>>>>>>>> need to understand the magic of how this is derived. >>>>>>>>>>> I am trying to think about this from the perspective of >>>>>>>>>>> using "Flink as a Library", which the MiniCluster is close to. >>>>>>>>>>> When starting Flink out of a running process, we cannot >>>>>>>>>>> assume that we are the only users of that process and that we can >>>>>>>>>>> mold the >>>>>>>>>>> process to our demands. I think a fix value for managed memory and >>>>>>>>>>> network >>>>>>>>>>> memory would feel more natural in such a setup than a mechanism >>>>>>>>>>> that is >>>>>>>>>>> tailored towards exclusive use of the process. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> +1 on having fixed values for managed / shuffle memory. >>>>>>>>>> >>>>>>>>> >>>>>>>>> *==> Cool, let's also see what Andrey and Till think here.* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> - Some off-heap memory goes into direct memory, some does not. >>>>>>>>>>> This confused me a bit. For example >>>>>>>>>>> "taskmanager.memory.framework.off-heap.size" is counted into >>>>>>>>>>> MaxDirectMemory while "taskmanager.memory.task.off-heap.size" is >>>>>>>>>>> counted as >>>>>>>>>>> native memory. Maybe we should rename the keys to reflect that. >>>>>>>>>>> There is no >>>>>>>>>>> one "off heap" memory type after all. Maybe use >>>>>>>>>>> "taskmanager.memory.task.native: XXXmb" and >>>>>>>>>>> "taskmanager.memory.framework.direct: XXXmb" instead? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I believe "taskmanager.memory.task.off-heap.size" is also >>>>>>>>>> accounted in the max direct memory size limit. The confusion >>>>>>>>>> probably comes >>>>>>>>>> from that "taskmanager.memory.framework.off-heap.size" explicitly >>>>>>>>>> mentioned >>>>>>>>>> that in its description while "taskmanager.memory.task.off-heap.size" >>>>>>>>>> didn't. That's actually because the framework off-heap memory is >>>>>>>>>> introduced >>>>>>>>>> later in a separate commit. We should fix that. >>>>>>>>>> >>>>>>>>>> For framework / task off-heap memory, we do not differentiate >>>>>>>>>> direct / native memory usage. That means the configure value for >>>>>>>>>> these two >>>>>>>>>> options could be a mixture of direct / native memory. Since we do >>>>>>>>>> not know >>>>>>>>>> the portion of direct memory out of the configured value, we have >>>>>>>>>> to conservatively account it all into the max direct memory size >>>>>>>>>> limit. >>>>>>>>>> >>>>>>>>> >>>>>>>>> *==> In that case, I am a bit confused. For the total size >>>>>>>>> calculation, it is fine. But why do we then set MaxDirectMemory? It >>>>>>>>> is a >>>>>>>>> difficult parameter, and the main reason to set it was (if I recall >>>>>>>>> correctly) to trigger GC based on direct memory allocation (to free >>>>>>>>> heap >>>>>>>>> structures that then in turn release direct memory). If the limit is >>>>>>>>> anyways too high (because we also count native memory in there) such >>>>>>>>> that >>>>>>>>> we can exceed the total process (container) memory, why do we set it >>>>>>>>> then?* >>>>>>>>> >>>>>>>>> >>>>>>>>> - The network memory keys are now called >>>>>>>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually >>>>>>>>>>> understood as a redistribution (random, or maybe by hash of key). >>>>>>>>>>> As an >>>>>>>>>>> example, there are many discussions about "shuffle join versus >>>>>>>>>>> broadcast >>>>>>>>>>> join", where "shuffle" is the synonym for "re-partitioning". We use >>>>>>>>>>> that >>>>>>>>>>> memory however for all network operations, like forward pipes, >>>>>>>>>>> broadcasts, >>>>>>>>>>> receiver-side buffering on checkpoints, etc. I find the name >>>>>>>>>>> "*.shuffle.*" >>>>>>>>>>> confusing, I am wondering if users would find that as well. So >>>>>>>>>>> throwing in >>>>>>>>>>> the suggestion to call the options "taskmanager.memory.network.*". >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On >>>>>>>>>> the other hand, one can also argue that this part of memory is used >>>>>>>>>> by >>>>>>>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" >>>>>>>>>> points more >>>>>>>>>> directly to the shuffle service components. >>>>>>>>>> >>>>>>>>> >>>>>>>>> *==> In that case, the name "Shuffle Environment" may be a bit >>>>>>>>> incorrect, because it is doing not only shuffles as well. The >>>>>>>>> ShuffleEnvironment is also more internal, so the name is not too >>>>>>>>> critical. >>>>>>>>> This isn't super high priority for me, but if we want to adjust it, >>>>>>>>> better >>>>>>>>> earlier than later.* >>>>>>>>> >>>>>>>>> >>>>>>>>> - The descriptions for the "taskmanager.memory.jvm-overhead.*" >>>>>>>>> keys say that it also accounts for I/O direct memory, but the >>>>>>>>> parameter is >>>>>>>>> not counted into the MaxDirectMemory parameter. >>>>>>>>> >>>>>>>>>> >>>>>>>>>> True. Since we already have framework off-heap memory accounted >>>>>>>>>> for ad hoc direct memory usages, accounting all of jvm-overhead also >>>>>>>>>> into >>>>>>>>>> max direct memory limit seems not necessary. I would suggest to >>>>>>>>>> remove "I/O >>>>>>>>>> direct memory" from the description, and explicitly mention that this >>>>>>>>>> option does not account for direct memory and will not be accounted >>>>>>>>>> into >>>>>>>>>> max direct memory limit. WDYT? >>>>>>>>>> >>>>>>>>> >>>>>>>>> *==> Sounds good. * >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> - Can make the new ConfigOptions strongly typed with the new >>>>>>>>>>> configuration options. For example, directly use MemorySize typed >>>>>>>>>>> options. >>>>>>>>>>> That makes validation automatic and saves us from breaking the >>>>>>>>>>> options >>>>>>>>>>> later. >>>>>>>>>> >>>>>>>>>> +1. Wasn't aware of the new memory type config options. >>>>>>>>>> >>>>>>>>> >>>>>>>>> *==> Thanks. Do you need help with adjusting this?* >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>