"taskmanager.memory.flink.size" in the default config has a few advantages.
- The value in the default config needs to be suitable for "trying out" Flink, for a good "getting started" experience. - For trying out Flink, standalone is the most common entry point (except running in IDE). - In standalone setup, from total process memory, we subtract quite a bit before we arrive at the usable memory. We also subtract managed memory from the heap now. I fear we might end up at a heap that becomes so small that it makes for a bad "getting started" experience. - I don't think it is realistic that users set the process memory to full machine memory. There is a lot on the machine as well in most cases. - In the JVM world, users are used to configuring the heap size and know that there is additional memory overhead. The "taskmanager.memory.flink.size" option fits well with that mindset. - One you start to think about the total process memory of Yarn containers, you are already past the getting-started phase and on the tuning phase. On Tue, Dec 24, 2019, 10:25 Andrey Zagrebin <azagre...@apache.org> wrote: > Thanks for the summary, Xintong! It makes sense to me. > > How about putting "taskmanager.memory.flink.size" in the configuration? > > Then new downloaded Flink behaves similar to the previous Standalone > setups. > > If someone upgrades the binaries, but re-uses their old configuration, > > then they get the compatibility as discussed previously. > > We used that approach previously with the fine-grained failover recovery. > > > > > I'm trying to understand why "taskmanager.memory.flink.size" rather than > > "taskmanager.memory.process.size" in the default flink-conf.yaml. Or put > it > > another way, why do we want the new downloaded Flink behaves similar to > > previous Standalone setups rather than previous active mode setups? Is > > there any special reason that I overlooked, which makes backwards > > compatibility for standalone setups more important than for active > setups? > > IMO, "taskmanager.memory.process.size" is easier for the new comers. For > > standalone setups, users can simply configure it to their machines' > > available memory size, without needing to worry about leaving enough > space > > for JVM overehead / metaspace. For containerized setups, it's more > > predictable how many memory the containers / Flink could use, which is > more > > friendly for users to manage their resource quota. > > Therefore, unless there is anything I overlooked, I'm in favor of putting > > "taskmanager.memory.process.size" rather than > > "taskmanager.memory.flink.size" in the default configuration. > > > I agree that having "taskmanager.memory.process.size" in default config > should be easier to understand and tweak for the new users because it is > just what they are ready to spend for Flink application. > The problem is when users upgrade Flink and use the new default > configuration then the behaviour can change: > - either if we put process memory then Flink memory shrinks and the new > default option contradicts their previous understanding > - or if we put Flink memory then larger container is requested. > The shrinking of memory sounds more implicit and worse. The increased > container request will just fail in the worst case and the memory setup can > be revisited. > We could increase the default "taskmanager.memory.process.size" to better > align it with the previous default setup for standalone > but this would not remove the possible confusion problem for the old users, > on the other hand the option is new and we can add a comment how to migrate > from the old one. > > All in all, now I also tend to have "taskmanager.memory.process.size" in > the default config unless there are more reasons for having less confusion > for the old standalone users. > > Best, > Andrey > > On Tue, Dec 24, 2019 at 5:50 AM Xintong Song <tonysong...@gmail.com> > wrote: > > > Hi all, > > > > It seems we have already get consensus on most of the issues. Thanks > > everyone for the good discussion. > > > > While there are still open questions under discussion, I'd like to > > summarize the discussion so far, and list the action items that we > already > > get consensus on. In this way, we can already start working on these > items > > while discussing the remaining open questions. Please correct me if the > > summary does not reflect your argument. > > > > *Action Items that we already get consensus on:* > > > > - Map "taskmanager.heap.size" to "taskmanager.memory.flink.size" for > > standalone setups, and "taskmanager.memory.process.size" for active > setups. > > (should be mentioned in release notes) > > - If not explicitly configured, MiniCluster should have fixed default > > network and managed memory sizes. (should be mentioned in docs & > release > > notes) > > - Change the memory config options' type from String to MemorySize > > - Change the config option key "taskmanager.memory.total-flink.size" > > to "taskmanager.memory.flink.size" > > - Change the config option key "taskmanager.memory.total-process.size" > > to "taskmanager.memory.process.size" > > - Update descriptions for "taskmanager.memory.framework.off-heap.size" > > and "taskmanager.memory.task.off-heap.size" to explicitly state that > > - Both direct and native memory are accounted > > - Will be fully counted into MaxDirectMemorySize > > - Update descriptions for > > "taskmanager.memory.jvm-overhead.[min|max|fraction]" to remove "I/O > direct > > memory" and explicitly state that it's not counted into > MaxDirectMemorySize > > - Print MemorySize with proper unit. (non-blocker for 1.10) > > > > > > *Questions that are still open:* > > > > - Which config option do we put in the default flink-conf.yaml? > > - "taskmanager.memory.flink.size" > > - "taskmanager.memory.process.size" > > - Deprecated "taskmanager.heap.size" > > - What is proper keys for network / shuffle memory > > - "taskmanager.memory.shuffle.*" > > - "taskmanager.memory.network.*" > > - "taskmanager.network.memory.*" > > > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Tue, Dec 24, 2019 at 10:19 AM Xintong Song <tonysong...@gmail.com> > > wrote: > > > >> How about putting "taskmanager.memory.flink.size" in the configuration? > >>> Then new downloaded Flink behaves similar to the previous Standalone > setups. > >>> If someone upgrades the binaries, but re-uses their old configuration, > >>> then they get the compatibility as discussed previously. > >>> We used that approach previously with the fine-grained failover > recovery. > >> > >> > >> I'm trying to understand why "taskmanager.memory.flink.size" rather than > >> "taskmanager.memory.process.size" in the default flink-conf.yaml. Or > put it > >> another way, why do we want the new downloaded Flink behaves similar to > >> previous Standalone setups rather than previous active mode setups? Is > >> there any special reason that I overlooked, which makes backwards > >> compatibility for standalone setups more important than for active > setups? > >> > >> IMO, "taskmanager.memory.process.size" is easier for the new comers. For > >> standalone setups, users can simply configure it to their machines' > >> available memory size, without needing to worry about leaving enough > space > >> for JVM overehead / metaspace. For containerized setups, it's more > >> predictable how many memory the containers / Flink could use, which is > more > >> friendly for users to manage their resource quota. > >> > >> Therefore, unless there is anything I overlooked, I'm in favor of > putting > >> "taskmanager.memory.process.size" rather than > >> "taskmanager.memory.flink.size" in the default configuration. > >> > >> > >> Thank you~ > >> > >> Xintong Song > >> > >> > >> > >> On Tue, Dec 24, 2019 at 4:27 AM Andrey Zagrebin <azagre...@apache.org> > >> wrote: > >> > >>> How about putting "taskmanager.memory.flink.size" in the configuration? > >>>> Then new downloaded Flink behaves similar to the previous Standalone > setups. > >>>> If someone upgrades the binaries, but re-uses their old configuration, > >>>> then they get the compatibility as discussed previously. > >>>> We used that approach previously with the fine-grained failover > >>>> recovery. > >>> > >>> +1, this sounds like a good compromise. > >>> > >>> +1 to not have more options for off-heap memory, that would get > >>>> confusing fast. We can keep the name "off-heap" for both task and > >>>> framework, as long as they mean the same thing: native plus direct, > and > >>>> fully counted into MaxDirectMemory. I would suggest to update the > config > >>>> descriptions then to reflect that. > >>>> > >>> True, this should be explained in the config descriptions. > >>> > >>> looks good to me > >>> > >>> From a user's perspective I believe "taskmanager.memory.network" would > >>>> be easier to understand as not everyone knows exactly what the shuffle > >>>> service is. I see the point that it would be a bit imprecise as we > can have > >>>> different shuffle implementations but I would go with the ease of > >>>> use/understanding here. Moreover, I think that we won't have many > different > >>>> shuffle service implementations in the foreseeable future. > >>> > >>> I agree that if we cannot find any other convincing names for the > >>> options, we should keep what we already have and change it if the > >>> alternative is convincing enough. > >>> The question is also whether we still want to rename it because it was > >>> "taskmanager.network.*memory*.*" in 1.9 but > "taskmanager.*memory*.network.*" > >>> is more aligned with other new memory option names. > >>> Or we can just 'un'-deprecate "taskmanager.network.*memory*.*". > >>> > >>> On Mon, Dec 23, 2019 at 8:42 PM Stephan Ewen <se...@apache.org> wrote: > >>> > >>>> How about putting "taskmanager.memory.flink.size" in the > configuration? > >>>> Then new downloaded Flink behaves similar to the previous Standalone > setups. > >>>> > >>>> If someone upgrades the binaries, but re-uses their old configuration, > >>>> then they get the compatibility as discussed previously. > >>>> We used that approach previously with the fine-grained failover > >>>> recovery. > >>>> > >>>> On Mon, Dec 23, 2019 at 3:27 AM Xintong Song <tonysong...@gmail.com> > >>>> wrote: > >>>> > >>>>> +1 to not have more options for off-heap memory, that would get > >>>>>> confusing fast. We can keep the name "off-heap" for both task and > >>>>>> framework, as long as they mean the same thing: native plus direct, > and > >>>>>> fully counted into MaxDirectMemory. I would suggest to update the > config > >>>>>> descriptions then to reflect that. > >>>>>> > >>>>> True, this should be explained in the config descriptions. > >>>>> > >>>>> Which configuration option will be set in Flink's default > >>>>>> flink-conf.yaml? If we want to maintain the existing behaviour it > would > >>>>>> have to be the then deprecated taskmanager.heap.size config option. > If we > >>>>>> are ok with Yarn requesting slightly larger containers, then it > could also > >>>>>> be taskmanager.memory.total-flink.size. > >>>>>> > >>>>> Good point. Currently, we have > >>>>> "taskmanager.memory.total-process.size". In order to preserve the > previous > >>>>> behavior, we need to have "taskmanager.heap.size" so it can be > mapped to > >>>>> different new options in standalone / active setups. > >>>>> I think we can have the deprecated "taskmanager.heap.size" in the > >>>>> default flink-conf.yaml, and also have the > >>>>> new "taskmanager.memory.total-process.size" in a commented line. We > can > >>>>> explain how the deprecated config option behaves differently in the > >>>>> comments, so that user can switch to the new config options if they > want to. > >>>>> > >>>>> Thank you~ > >>>>> > >>>>> Xintong Song > >>>>> > >>>>> > >>>>> > >>>>> On Sat, Dec 21, 2019 at 1:00 AM Till Rohrmann <trohrm...@apache.org> > >>>>> wrote: > >>>>> > >>>>>> Thanks for the feedback and good discussion everyone. I left some > >>>>>> comments inline. > >>>>>> > >>>>>> On Fri, Dec 20, 2019 at 1:59 PM Stephan Ewen <se...@apache.org> > >>>>>> wrote: > >>>>>> > >>>>>>> +1 to not have more options for off-heap memory, that would get > >>>>>>> confusing fast. We can keep the name "off-heap" for both task and > >>>>>>> framework, as long as they mean the same thing: native plus > direct, and > >>>>>>> fully counted into MaxDirectMemory. I would suggest to update the > config > >>>>>>> descriptions then to reflect that. > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Dec 20, 2019 at 1:03 PM Xintong Song < > tonysong...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Regarding the framework/task direct/native memory options, I tend > >>>>>>>> to think it differently. I'm in favor of keep the > "*.off-heap.size" for the > >>>>>>>> config option keys. > >>>>>>>> > >>>>>>>> - It's not necessary IMO to expose the difference concepts of > >>>>>>>> direct / native memory to the users. > >>>>>>>> - I would avoid introducing more options for native memory if > >>>>>>>> possible. Taking fine grained resource management and dynamic > slot > >>>>>>>> allocation into consideration, that also means introduce more > fields into > >>>>>>>> ResourceSpec / ResourceProfile. > >>>>>>>> - My gut feeling is that having a relative loose > >>>>>>>> MaxDirectMemory should not be a big problem. > >>>>>>>> - In most cases, the task / framework off-heap memory should be > >>>>>>>> mainly (if not all) direct memory, so the difference > between derived > >>>>>>>> MaxDirectMemory and the ideal direct memory limit should > not be too much. > >>>>>>>> - We do not have a good way to know the exact size needed > >>>>>>>> for jvm overhead / metaspace and framework / task off-heap > memory, thus > >>>>>>>> having to conservatively reserve slightly more memory then > what actually > >>>>>>>> needed. Such reserved but not used memory can cover for the > small > >>>>>>>> MaxDirectMemory error. > >>>>>>>> - > >>>>>>>> - MaxDirectMemory is not the only way to trigger full gc. We > >>>>>>>> still heap activities that can also trigger the gc. > >>>>>>>> > >>>>>>>> Regarding the memory type config options, I've looked into the > >>>>>>>> latest ConfigOptions changes. I think it shouldn't be too > complicated to > >>>>>>>> change the config options to use memory type, and I can handle it > maybe > >>>>>>>> during your vacations. > >>>>>>>> > >>>>>>>> > >>>>>>>> Also agree with improving MemorySize logging and parsing. This > >>>>>>>> should not be a blocker that has to be done in 1.10. I would say > we finish > >>>>>>>> other works (testability, documentation and those discussed in > this thread) > >>>>>>>> first, and get to this only if we have time. > >>>>>>>> > >>>>>>>> > >>>>>>>> Thank you~ > >>>>>>>> > >>>>>>>> Xintong Song > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Fri, Dec 20, 2019 at 6:07 PM Andrey Zagrebin < > >>>>>>>> azagrebin.apa...@gmail.com> wrote: > >>>>>>>> > >>>>>>>>> Hi Stephan and Xintong, > >>>>>>>>> > >>>>>>>>> Thanks for the further FLIP-49 feedbacks. > >>>>>>>>> > >>>>>>>>> - "taskmanager.memory.size" (old main config option) is > replaced > >>>>>>>>>> by "taskmanager.memory.total-process.size" which has a > different meaning in > >>>>>>>>>> standalone setups. The old option did not subtract metaspace > and other > >>>>>>>>>> overhead, while the new option does. That means that with the > default > >>>>>>>>>> config, standalone clusters get quite a bit less memory. > (independent of > >>>>>>>>>> managed memory going off heap). > >>>>>>>>>> I am wondering if we could interpret > >>>>>>>>>> "taskmanager.memory.size" as the deprecated key for > >>>>>>>>>> "taskmanager.memory.total-flink.size". That would be in line > with the old > >>>>>>>>>> mechanism (assuming managed memory is set to off heap). > >>>>>>>>>> The effect would be that the container size on Yarn/Mesos > >>>>>>>>>> increases, because from "taskmanager.memory.total-flink.size", > we need to > >>>>>>>>>> add overhead and metaspace to reach the total process size, > rather than > >>>>>>>>>> cutting off memory. But if we want, we could even adjust for > that in the > >>>>>>>>>> active resource manager, getting full backwards compatibility > on that part. > >>>>>>>>>> Curious to hear more thoughts there. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I believe you mean "taskmanager.heap.size". > >>>>>>>>> > >>>>>>>>> I think the problem here is that the > >>>>>>>>> legacy "taskmanager.heap.size" was used differently in > standalone setups > >>>>>>>>> and active yarn / mesos setups, and such different calculation > logics and > >>>>>>>>> behaviors are exactly what we want to avoid with FLIP-49. > Therefore, I'm > >>>>>>>>> not in favor of treating "taskmanager.memory.total-flink.size" > differently > >>>>>>>>> for standalone and active setups. > >>>>>>>>> > >>>>>>>>> I think what we really want is probably > >>>>>>>>> mapping "taskmanager.heap.size" to different new config options > in > >>>>>>>>> different setups. How about we mark "taskmanager.heap.size" as > deprecated > >>>>>>>>> key for neither of "taskmanager.memory.total-process.size" and > >>>>>>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if > explicitly > >>>>>>>>> configured) in startup scripts / active resource managers, and > set the > >>>>>>>>> value to "taskmanager.memory.total-flink.size" in the scripts and > >>>>>>>>> "taskmanager.memory.total-process.size" in active resource > managers (if the > >>>>>>>>> new config options are not configured). We can provide util > methods in > >>>>>>>>> TaskExecutorResourceUtils for such conversions, to keep all the > >>>>>>>>> configuration logics at one place. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I agree that the problem is that the legacy option > >>>>>>>>> ‘taskmanager.heap.size’ has different semantics for > standalone/container. > >>>>>>>>> We had it initially falling back to > 'taskmanager.memory.total-flink.size’ > >>>>>>>>> but I changed that to align it with container cut-off. Now I see > it changes > >>>>>>>>> standalone setup then. > >>>>>>>>> +1 for supporting its backwards compatibility differently for > >>>>>>>>> standalone/container setups. > >>>>>>>>> > >>>>>>>> > >>>>>> Which configuration option will be set in Flink's default > >>>>>> flink-conf.yaml? If we want to maintain the existing behaviour it > would > >>>>>> have to be the then deprecated taskmanager.heap.size config option. > If we > >>>>>> are ok with Yarn requesting slightly larger containers, then it > could also > >>>>>> be taskmanager.memory.total-flink.size. > >>>>>> > >>>>>>> > >>>>>>>>> > >>>>>>>>> - Mini Cluster tries to imitate exact ratio of memory pools as > a > >>>>>>>>>> standalone setup. I get the idea behind that, but I am > wondering if it is > >>>>>>>>>> the right approach here. > >>>>>>>>>> For example: I started a relatively large JVM (large heap > >>>>>>>>>> size of 10 GB) as a test. With the current logic, the system > tries to > >>>>>>>>>> reserve an additional 6GB for managed memory which is more than > there is > >>>>>>>>>> memory left. When you see the error that no memory could be > allocated, you > >>>>>>>>>> need to understand the magic of how this is derived. > >>>>>>>>>> I am trying to think about this from the perspective of > using > >>>>>>>>>> "Flink as a Library", which the MiniCluster is close to. > >>>>>>>>>> When starting Flink out of a running process, we cannot > >>>>>>>>>> assume that we are the only users of that process and that we > can mold the > >>>>>>>>>> process to our demands. I think a fix value for managed memory > and network > >>>>>>>>>> memory would feel more natural in such a setup than a mechanism > that is > >>>>>>>>>> tailored towards exclusive use of the process. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> +1 on having fixed values for managed / shuffle memory. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> also +1 for that, if user has not specified any main options to > >>>>>>>>> derive memory. We should also log this fixing of managed / > shuffle memory. > >>>>>>>>> And just noticed, we could also sanity check framework and if > >>>>>>>>> explicitly configured task heap against available JVM heap, and > at least > >>>>>>>>> log inconsistencies. > >>>>>>>>> > >>>>>>>>> - Some off-heap memory goes into direct memory, some does not. > >>>>>>>>>> This confused me a bit. For example > >>>>>>>>>> "taskmanager.memory.framework.off-heap.size" is counted into > >>>>>>>>>> MaxDirectMemory while "taskmanager.memory.task.off-heap.size" > is counted as > >>>>>>>>>> native memory. Maybe we should rename the keys to reflect that. > There is no > >>>>>>>>>> one "off heap" memory type after all. Maybe use > >>>>>>>>>> "taskmanager.memory.task.native: XXXmb" and > >>>>>>>>>> "taskmanager.memory.framework.direct: XXXmb" instead? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I believe "taskmanager.memory.task.off-heap.size" is also > >>>>>>>>> accounted in the max direct memory size limit. The confusion > probably comes > >>>>>>>>> from that "taskmanager.memory.framework.off-heap.size" > explicitly mentioned > >>>>>>>>> that in its description while > "taskmanager.memory.task.off-heap.size" > >>>>>>>>> didn't. That's actually because the framework off-heap memory is > introduced > >>>>>>>>> later in a separate commit. We should fix that. > >>>>>>>>> > >>>>>>>>> For framework / task off-heap memory, we do not differentiate > >>>>>>>>> direct / native memory usage. That means the configure value for > these two > >>>>>>>>> options could be a mixture of direct / native memory. Since we > do not know > >>>>>>>>> the portion of direct memory out of the configured value, we have > >>>>>>>>> to conservatively account it all into the max direct memory size > limit. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> In that case, I am a bit confused. For the total size > >>>>>>>>> calculation, it is fine. But why do we then set MaxDirectMemory? > It is a > >>>>>>>>> difficult parameter, and the main reason to set it was (if I > recall > >>>>>>>>> correctly) to trigger GC based on direct memory allocation (to > free heap > >>>>>>>>> structures that then in turn release direct memory). If the > limit is > >>>>>>>>> anyways too high (because we also count native memory in there) > such that > >>>>>>>>> we can exceed the total process (container) memory, why do we > set it then?* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I always also thought about it as providing more safety net for > >>>>>>>>> direct allocations but GC thing looks more important. > >>>>>>>>> > >>>>>>>>> +1 for fixing docs for 'taskmanager.memory.task.off-heap.size’ > and > >>>>>>>>> renaming to ‘direct' as this is what really happens > >>>>>>>>> if we want to support direct limit more exact than now. > >>>>>>>>> > >>>>>>>>> I also think that it is hard to separate direct / native memory > >>>>>>>>> unless we introduce even more options. > >>>>>>>>> If user wants to keep the direct limit tight to a certain value > >>>>>>>>> but also use native memory outside of it, > >>>>>>>>> she would have to increase something else, like JVM overhead to > >>>>>>>>> account for it and there is no other better way. > >>>>>>>>> Having more options to account for the native memory outside of > >>>>>>>>> direct limit complicates things but can be introduced later if > needed. > >>>>>>>>> > >>>>>>>>> - What do you think about renaming > >>>>>>>>>> "taskmanager.memory.total-flink.size" to > "taskmanager.memory.flink.size" > >>>>>>>>>> and "taskmanager.memory.total-process.size" to > >>>>>>>>>> "taskmanager.memory.process.size" (or > "taskmanager.memory.jvm.total"). I > >>>>>>>>>> think these keys may be a bit less clumsy (dropping the > "total-") without > >>>>>>>>>> loss of expressiveness. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> +1 on this. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> +1 as well. Also an option: > >>>>>>>>> 'taskmanager.memory.total-process.size’ -> > >>>>>>>>> ‘taskmanager.memory.jvm.process.size’, > >>>>>>>>> although it can be also mentioned in docs that we mean JVM > process. > >>>>>>>>> > >>>>>>>> > >>>>>> I'd be in favour of Stephan's proposal for the config keys as > shorter > >>>>>> is usually better and they are still descriptive enough. Between > >>>>>> "taskmanager.memory.process.size" and > "taskmanager.memory.jvm.total" I > >>>>>> would slightly favour the first variant. > >>>>>> > >>>>>>> > >>>>>>>>> - The network memory keys are now called > >>>>>>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is > usually > >>>>>>>>>> understood as a redistribution (random, or maybe by hash of > key). As an > >>>>>>>>>> example, there are many discussions about "shuffle join versus > broadcast > >>>>>>>>>> join", where "shuffle" is the synonym for "re-partitioning". We > use that > >>>>>>>>>> memory however for all network operations, like forward pipes, > broadcasts, > >>>>>>>>>> receiver-side buffering on checkpoints, etc. I find the name > "*.shuffle.*" > >>>>>>>>>> confusing, I am wondering if users would find that as well. So > throwing in > >>>>>>>>>> the suggestion to call the options > "taskmanager.memory.network.*". > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On > the > >>>>>>>>> other hand, one can also argue that this part of memory is used > by > >>>>>>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" > points more > >>>>>>>>> directly to the shuffle service components. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> In that case, the name "Shuffle Environment" may be a bit > >>>>>>>>> incorrect, because it is doing not only shuffles as well. The > >>>>>>>>> ShuffleEnvironment is also more internal, so the name is not too > critical. > >>>>>>>>> This isn't super high priority for me, but if we want to adjust > it, better > >>>>>>>>> earlier than later.* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> This is also a bit controversial topic for me. Indeed, we have > >>>>>>>>> always used ’network’ for this concept of task data shuffling > over the > >>>>>>>>> network and this can confuse existing users. > >>>>>>>>> > >>>>>>>>> On the other hand for the new users and in a long term, ’network’ > >>>>>>>>> can delude into a conclusion that all network memory is managed > by this > >>>>>>>>> option. > >>>>>>>>> Also other types of shuffle might not directly deal with network > >>>>>>>>> at all. > >>>>>>>>> > >>>>>>>>> By calling it shuffle, we were somewhat biased by understanding > it > >>>>>>>>> in term of map/reduce. This is rather an inter-task data > exchange. > >>>>>>>>> Maybe then 'taskmanager.memory.shuffle.communication.*’ or > >>>>>>>>> ‘taskmanager.memory.task.shuffle/communication/io/network.*’. > >>>>>>>>> > >>>>>>>> > >>>>>> From a user's perspective I believe "taskmanager.memory.network" > >>>>>> would be easier to understand as not everyone knows exactly what the > >>>>>> shuffle service is. I see the point that it would be a bit > imprecise as we > >>>>>> can have different shuffle implementations but I would go with the > ease of > >>>>>> use/understanding here. Moreover, I think that we won't have many > different > >>>>>> shuffle service implementations in the foreseeable future. > >>>>>> > >>>>>>> > >>>>>>>>> - The descriptions for the "taskmanager.memory.jvm-overhead.*" > >>>>>>>>>> keys say that it also accounts for I/O direct memory, but the > parameter is > >>>>>>>>>> not counted into the MaxDirectMemory parameter. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> True. Since we already have framework off-heap memory accounted > >>>>>>>>> for ad hoc direct memory usages, accounting all of jvm-overhead > also into > >>>>>>>>> max direct memory limit seems not necessary. I would suggest to > remove "I/O > >>>>>>>>> direct memory" from the description, and explicitly mention that > this > >>>>>>>>> option does not account for direct memory and will not be > accounted into > >>>>>>>>> max direct memory limit. WDYT? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> +1 > >>>>>>>>> > >>>>>>>>> - Can make the new ConfigOptions strongly typed with the new > >>>>>>>>>> configuration options. For example, directly use MemorySize > typed options. > >>>>>>>>>> That makes validation automatic and saves us from breaking the > options > >>>>>>>>>> later. > >>>>>>>>> > >>>>>>>>> +1. Wasn't aware of the new memory type config options. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> +1 > >>>>>>>>> > >>>>>>>>> *==> Thanks. Do you need help with adjusting this?* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> I would appreciate it. > >>>>>>>>> > >>>>>>>>> Also small side note, we extensively use MemorySize in logs now > >>>>>>>>> but it might be not always readable as its string representation > is only in > >>>>>>>>> bytes atm > >>>>>>>>> and does not reduce it to kb/mb/etc in case of big bytes value. > We > >>>>>>>>> could have at least some .prettyPrint function to use in logs. > >>>>>>>>> And .fromMegabytes/etc factory methods would improve code > >>>>>>>>> readability instead of .parse(int + “m”). > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Andrey > >>>>>>>>> > >>>>>>>>> On 20 Dec 2019, at 12:13, Stephan Ewen <se...@apache.org> wrote: > >>>>>>>>> > >>>>>>>>> Hi Xintong! > >>>>>>>>> > >>>>>>>>> Please find my answers inline: > >>>>>>>>> > >>>>>>>>>> - "taskmanager.memory.size" (old main config option) is > >>>>>>>>>>> replaced by "taskmanager.memory.total-process.size" which has > a different > >>>>>>>>>>> meaning in standalone setups. The old option did not subtract > metaspace and > >>>>>>>>>>> other overhead, while the new option does. That means that > with the default > >>>>>>>>>>> config, standalone clusters get quite a bit less memory. > (independent of > >>>>>>>>>>> managed memory going off heap). > >>>>>>>>>>> I am wondering if we could interpret > >>>>>>>>>>> "taskmanager.memory.size" as the deprecated key for > >>>>>>>>>>> "taskmanager.memory.total-flink.size". That would be in line > with the old > >>>>>>>>>>> mechanism (assuming managed memory is set to off heap). > >>>>>>>>>>> The effect would be that the container size on Yarn/Mesos > >>>>>>>>>>> increases, because from "taskmanager.memory.total-flink.size", > we need to > >>>>>>>>>>> add overhead and metaspace to reach the total process size, > rather than > >>>>>>>>>>> cutting off memory. But if we want, we could even adjust for > that in the > >>>>>>>>>>> active resource manager, getting full backwards compatibility > on that part. > >>>>>>>>>>> Curious to hear more thoughts there. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I believe you mean "taskmanager.heap.size". > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> Yes* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> I think the problem here is that the > >>>>>>>>>> legacy "taskmanager.heap.size" was used differently in > standalone setups > >>>>>>>>>> and active yarn / mesos setups, and such different calculation > logics and > >>>>>>>>>> behaviors are exactly what we want to avoid with FLIP-49. > Therefore, I'm > >>>>>>>>>> not in favor of treating "taskmanager.memory.total-flink.size" > differently > >>>>>>>>>> for standalone and active setups. > >>>>>>>>>> > >>>>>>>>>> I think what we really want is probably > >>>>>>>>>> mapping "taskmanager.heap.size" to different new config options > in > >>>>>>>>>> different setups. How about we mark "taskmanager.heap.size" as > deprecated > >>>>>>>>>> key for neither of "taskmanager.memory.total-process.size" and > >>>>>>>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if > explicitly > >>>>>>>>>> configured) in startup scripts / active resource managers, and > set the > >>>>>>>>>> value to "taskmanager.memory.total-flink.size" in the scripts > and > >>>>>>>>>> "taskmanager.memory.total-process.size" in active resource > managers (if the > >>>>>>>>>> new config options are not configured). We can provide util > methods in > >>>>>>>>>> TaskExecutorResourceUtils for such conversions, to keep all the > >>>>>>>>>> configuration logics at one place. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> This is pretty much what I meant as well (maybe my > >>>>>>>>> description was not very clear), so +1 for that mechanism* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> - Mini Cluster tries to imitate exact ratio of memory pools as > a > >>>>>>>>>>> standalone setup. I get the idea behind that, but I am > wondering if it is > >>>>>>>>>>> the right approach here. > >>>>>>>>>>> For example: I started a relatively large JVM (large heap > >>>>>>>>>>> size of 10 GB) as a test. With the current logic, the system > tries to > >>>>>>>>>>> reserve an additional 6GB for managed memory which is more > than there is > >>>>>>>>>>> memory left. When you see the error that no memory could be > allocated, you > >>>>>>>>>>> need to understand the magic of how this is derived. > >>>>>>>>>>> I am trying to think about this from the perspective of > >>>>>>>>>>> using "Flink as a Library", which the MiniCluster is close to. > >>>>>>>>>>> When starting Flink out of a running process, we cannot > >>>>>>>>>>> assume that we are the only users of that process and that we > can mold the > >>>>>>>>>>> process to our demands. I think a fix value for managed memory > and network > >>>>>>>>>>> memory would feel more natural in such a setup than a > mechanism that is > >>>>>>>>>>> tailored towards exclusive use of the process. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> +1 on having fixed values for managed / shuffle memory. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> Cool, let's also see what Andrey and Till think here.* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> - Some off-heap memory goes into direct memory, some does not. > >>>>>>>>>>> This confused me a bit. For example > >>>>>>>>>>> "taskmanager.memory.framework.off-heap.size" is counted into > >>>>>>>>>>> MaxDirectMemory while "taskmanager.memory.task.off-heap.size" > is counted as > >>>>>>>>>>> native memory. Maybe we should rename the keys to reflect > that. There is no > >>>>>>>>>>> one "off heap" memory type after all. Maybe use > >>>>>>>>>>> "taskmanager.memory.task.native: XXXmb" and > >>>>>>>>>>> "taskmanager.memory.framework.direct: XXXmb" instead? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I believe "taskmanager.memory.task.off-heap.size" is also > >>>>>>>>>> accounted in the max direct memory size limit. The confusion > probably comes > >>>>>>>>>> from that "taskmanager.memory.framework.off-heap.size" > explicitly mentioned > >>>>>>>>>> that in its description while > "taskmanager.memory.task.off-heap.size" > >>>>>>>>>> didn't. That's actually because the framework off-heap memory > is introduced > >>>>>>>>>> later in a separate commit. We should fix that. > >>>>>>>>>> > >>>>>>>>>> For framework / task off-heap memory, we do not differentiate > >>>>>>>>>> direct / native memory usage. That means the configure value > for these two > >>>>>>>>>> options could be a mixture of direct / native memory. Since we > do not know > >>>>>>>>>> the portion of direct memory out of the configured value, we > have > >>>>>>>>>> to conservatively account it all into the max direct memory > size limit. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> In that case, I am a bit confused. For the total size > >>>>>>>>> calculation, it is fine. But why do we then set MaxDirectMemory? > It is a > >>>>>>>>> difficult parameter, and the main reason to set it was (if I > recall > >>>>>>>>> correctly) to trigger GC based on direct memory allocation (to > free heap > >>>>>>>>> structures that then in turn release direct memory). If the > limit is > >>>>>>>>> anyways too high (because we also count native memory in there) > such that > >>>>>>>>> we can exceed the total process (container) memory, why do we > set it then?* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> - The network memory keys are now called > >>>>>>>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is > usually > >>>>>>>>>>> understood as a redistribution (random, or maybe by hash of > key). As an > >>>>>>>>>>> example, there are many discussions about "shuffle join versus > broadcast > >>>>>>>>>>> join", where "shuffle" is the synonym for "re-partitioning". > We use that > >>>>>>>>>>> memory however for all network operations, like forward pipes, > broadcasts, > >>>>>>>>>>> receiver-side buffering on checkpoints, etc. I find the name > "*.shuffle.*" > >>>>>>>>>>> confusing, I am wondering if users would find that as well. So > throwing in > >>>>>>>>>>> the suggestion to call the options > "taskmanager.memory.network.*". > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On > >>>>>>>>>> the other hand, one can also argue that this part of memory is > used by > >>>>>>>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" > points more > >>>>>>>>>> directly to the shuffle service components. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> In that case, the name "Shuffle Environment" may be a bit > >>>>>>>>> incorrect, because it is doing not only shuffles as well. The > >>>>>>>>> ShuffleEnvironment is also more internal, so the name is not too > critical. > >>>>>>>>> This isn't super high priority for me, but if we want to adjust > it, better > >>>>>>>>> earlier than later.* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> - The descriptions for the "taskmanager.memory.jvm-overhead.*" > >>>>>>>>> keys say that it also accounts for I/O direct memory, but the > parameter is > >>>>>>>>> not counted into the MaxDirectMemory parameter. > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> True. Since we already have framework off-heap memory accounted > >>>>>>>>>> for ad hoc direct memory usages, accounting all of jvm-overhead > also into > >>>>>>>>>> max direct memory limit seems not necessary. I would suggest to > remove "I/O > >>>>>>>>>> direct memory" from the description, and explicitly mention > that this > >>>>>>>>>> option does not account for direct memory and will not be > accounted into > >>>>>>>>>> max direct memory limit. WDYT? > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> Sounds good. * > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> - Can make the new ConfigOptions strongly typed with the new > >>>>>>>>>>> configuration options. For example, directly use MemorySize > typed options. > >>>>>>>>>>> That makes validation automatic and saves us from breaking the > options > >>>>>>>>>>> later. > >>>>>>>>>> > >>>>>>>>>> +1. Wasn't aware of the new memory type config options. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> *==> Thanks. Do you need help with adjusting this?* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Stephan > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> >