"taskmanager.memory.flink.size" in the default config has a few advantages.

 - The value in the default config needs to be suitable for "trying out"
Flink, for a good "getting started" experience.

  - For trying out Flink, standalone is the most common entry point (except
running in IDE).

  - In standalone setup, from total process memory, we subtract quite a bit
before we arrive at the usable memory. We also subtract managed memory from
the heap now. I fear we might end up at a heap that becomes so small that
it makes for a bad "getting started" experience.

  - I don't think it is realistic that users set the process memory to full
machine memory. There is a lot on the machine as well in most cases.

  - In the JVM world, users are used to configuring the heap size and know
that there is additional memory overhead. The
"taskmanager.memory.flink.size" option fits well with that mindset.

  - One you start to think about the total process memory of Yarn
containers, you are already past the getting-started phase and on the
tuning phase.


On Tue, Dec 24, 2019, 10:25 Andrey Zagrebin <azagre...@apache.org> wrote:

> Thanks for the summary, Xintong! It makes sense to me.
>
> How about putting "taskmanager.memory.flink.size" in the configuration?
> > Then new downloaded Flink behaves similar to the previous Standalone
> setups.
> > If someone upgrades the binaries, but re-uses their old configuration,
> > then they get the compatibility as discussed previously.
> > We used that approach previously with the fine-grained failover recovery.
>
>
>
> > I'm trying to understand why "taskmanager.memory.flink.size" rather than
> > "taskmanager.memory.process.size" in the default flink-conf.yaml. Or put
> it
> > another way, why do we want the new downloaded Flink behaves similar to
> > previous Standalone setups rather than previous active mode setups? Is
> > there any special reason that I overlooked, which makes backwards
> > compatibility for standalone setups more important than for active
> setups?
> > IMO, "taskmanager.memory.process.size" is easier for the new comers. For
> > standalone setups, users can simply configure it to their machines'
> > available memory size, without needing to worry about leaving enough
> space
> > for JVM overehead / metaspace. For containerized setups, it's more
> > predictable how many memory the containers / Flink could use, which is
> more
> > friendly for users to manage their resource quota.
> > Therefore, unless there is anything I overlooked, I'm in favor of putting
> > "taskmanager.memory.process.size" rather than
> > "taskmanager.memory.flink.size" in the default configuration.
>
>
> I agree that having "taskmanager.memory.process.size" in default config
> should be easier to understand and tweak for the new users because it is
> just what they are ready to spend for Flink application.
> The problem is when users upgrade Flink and use the new default
> configuration then the behaviour can change:
> - either if we put process memory then Flink memory shrinks and the new
> default option contradicts their previous understanding
> - or if we put Flink memory then larger container is requested.
> The shrinking of memory sounds more implicit and worse. The increased
> container request will just fail in the worst case and the memory setup can
> be revisited.
> We could increase the default "taskmanager.memory.process.size" to better
> align it with the previous default setup for standalone
> but this would not remove the possible confusion problem for the old users,
> on the other hand the option is new and we can add a comment how to migrate
> from the old one.
>
> All in all, now I also tend to have "taskmanager.memory.process.size" in
> the default config unless there are more reasons for having less confusion
> for the old standalone users.
>
> Best,
> Andrey
>
> On Tue, Dec 24, 2019 at 5:50 AM Xintong Song <tonysong...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > It seems we have already get consensus on most of the issues. Thanks
> > everyone for the good discussion.
> >
> > While there are still open questions under discussion, I'd like to
> > summarize the discussion so far, and list the action items that we
> already
> > get consensus on. In this way, we can already start working on these
> items
> > while discussing the remaining open questions. Please correct me if the
> > summary does not reflect your argument.
> >
> > *Action Items that we already get consensus on:*
> >
> >    - Map "taskmanager.heap.size" to "taskmanager.memory.flink.size" for
> >    standalone setups, and "taskmanager.memory.process.size" for active
> setups.
> >    (should be mentioned in release notes)
> >    - If not explicitly configured, MiniCluster should have fixed default
> >    network and managed memory sizes. (should be mentioned in docs &
> release
> >    notes)
> >    - Change the memory config options' type from String to MemorySize
> >    - Change the config option key "taskmanager.memory.total-flink.size"
> >    to "taskmanager.memory.flink.size"
> >    - Change the config option key "taskmanager.memory.total-process.size"
> >    to "taskmanager.memory.process.size"
> >    - Update descriptions for "taskmanager.memory.framework.off-heap.size"
> >    and "taskmanager.memory.task.off-heap.size" to explicitly state that
> >       - Both direct and native memory are accounted
> >       - Will be fully counted into MaxDirectMemorySize
> >    - Update descriptions for
> >    "taskmanager.memory.jvm-overhead.[min|max|fraction]" to remove "I/O
> direct
> >    memory" and explicitly state that it's not counted into
> MaxDirectMemorySize
> >    - Print MemorySize with proper unit. (non-blocker for 1.10)
> >
> >
> > *Questions that are still open:*
> >
> >    - Which config option do we put in the default flink-conf.yaml?
> >       - "taskmanager.memory.flink.size"
> >       - "taskmanager.memory.process.size"
> >       - Deprecated "taskmanager.heap.size"
> >    - What is proper keys for network / shuffle memory
> >       - "taskmanager.memory.shuffle.*"
> >       - "taskmanager.memory.network.*"
> >       - "taskmanager.network.memory.*"
> >
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Dec 24, 2019 at 10:19 AM Xintong Song <tonysong...@gmail.com>
> > wrote:
> >
> >> How about putting "taskmanager.memory.flink.size" in the configuration?
> >>> Then new downloaded Flink behaves similar to the previous Standalone
> setups.
> >>> If someone upgrades the binaries, but re-uses their old configuration,
> >>> then they get the compatibility as discussed previously.
> >>> We used that approach previously with the fine-grained failover
> recovery.
> >>
> >>
> >> I'm trying to understand why "taskmanager.memory.flink.size" rather than
> >> "taskmanager.memory.process.size" in the default flink-conf.yaml. Or
> put it
> >> another way, why do we want the new downloaded Flink behaves similar to
> >> previous Standalone setups rather than previous active mode setups? Is
> >> there any special reason that I overlooked, which makes backwards
> >> compatibility for standalone setups more important than for active
> setups?
> >>
> >> IMO, "taskmanager.memory.process.size" is easier for the new comers. For
> >> standalone setups, users can simply configure it to their machines'
> >> available memory size, without needing to worry about leaving enough
> space
> >> for JVM overehead / metaspace. For containerized setups, it's more
> >> predictable how many memory the containers / Flink could use, which is
> more
> >> friendly for users to manage their resource quota.
> >>
> >> Therefore, unless there is anything I overlooked, I'm in favor of
> putting
> >> "taskmanager.memory.process.size" rather than
> >> "taskmanager.memory.flink.size" in the default configuration.
> >>
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Dec 24, 2019 at 4:27 AM Andrey Zagrebin <azagre...@apache.org>
> >> wrote:
> >>
> >>> How about putting "taskmanager.memory.flink.size" in the configuration?
> >>>> Then new downloaded Flink behaves similar to the previous Standalone
> setups.
> >>>> If someone upgrades the binaries, but re-uses their old configuration,
> >>>> then they get the compatibility as discussed previously.
> >>>> We used that approach previously with the fine-grained failover
> >>>> recovery.
> >>>
> >>> +1, this sounds like a good compromise.
> >>>
> >>> +1 to not have more options for off-heap memory, that would get
> >>>> confusing fast. We can keep the name "off-heap" for both task and
> >>>> framework, as long as they mean the same thing: native plus direct,
> and
> >>>> fully counted into MaxDirectMemory. I would suggest to update the
> config
> >>>> descriptions then to reflect that.
> >>>>
> >>> True, this should be explained in the config descriptions.
> >>>
> >>> looks good to me
> >>>
> >>> From a user's perspective I believe "taskmanager.memory.network" would
> >>>> be easier to understand as not everyone knows exactly what the shuffle
> >>>> service is. I see the point that it would be a bit imprecise as we
> can have
> >>>> different shuffle implementations but I would go with the ease of
> >>>> use/understanding here. Moreover, I think that we won't have many
> different
> >>>> shuffle service implementations in the foreseeable future.
> >>>
> >>> I agree that if we cannot find any other convincing names for the
> >>> options, we should keep what we already have and change it if the
> >>> alternative is convincing enough.
> >>> The question is also whether we still want to rename it because it was
> >>> "taskmanager.network.*memory*.*" in 1.9 but
> "taskmanager.*memory*.network.*"
> >>> is more aligned with other new memory option names.
> >>> Or we can just 'un'-deprecate "taskmanager.network.*memory*.*".
> >>>
> >>> On Mon, Dec 23, 2019 at 8:42 PM Stephan Ewen <se...@apache.org> wrote:
> >>>
> >>>> How about putting "taskmanager.memory.flink.size" in the
> configuration?
> >>>> Then new downloaded Flink behaves similar to the previous Standalone
> setups.
> >>>>
> >>>> If someone upgrades the binaries, but re-uses their old configuration,
> >>>> then they get the compatibility as discussed previously.
> >>>> We used that approach previously with the fine-grained failover
> >>>> recovery.
> >>>>
> >>>> On Mon, Dec 23, 2019 at 3:27 AM Xintong Song <tonysong...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> +1 to not have more options for off-heap memory, that would get
> >>>>>> confusing fast. We can keep the name "off-heap" for both task and
> >>>>>> framework, as long as they mean the same thing: native plus direct,
> and
> >>>>>> fully counted into MaxDirectMemory. I would suggest to update the
> config
> >>>>>> descriptions then to reflect that.
> >>>>>>
> >>>>> True, this should be explained in the config descriptions.
> >>>>>
> >>>>> Which configuration option will be set in Flink's default
> >>>>>> flink-conf.yaml? If we want to maintain the existing behaviour it
> would
> >>>>>> have to be the then deprecated taskmanager.heap.size config option.
> If we
> >>>>>> are ok with Yarn requesting slightly larger containers, then it
> could also
> >>>>>> be taskmanager.memory.total-flink.size.
> >>>>>>
> >>>>> Good point. Currently, we have
> >>>>> "taskmanager.memory.total-process.size". In order to preserve the
> previous
> >>>>> behavior, we need to have "taskmanager.heap.size" so it can be
> mapped to
> >>>>> different new options in standalone / active setups.
> >>>>> I think we can have the deprecated "taskmanager.heap.size" in the
> >>>>> default flink-conf.yaml, and also have the
> >>>>> new "taskmanager.memory.total-process.size" in a commented line. We
> can
> >>>>> explain how the deprecated config option behaves differently in the
> >>>>> comments, so that user can switch to the new config options if they
> want to.
> >>>>>
> >>>>> Thank you~
> >>>>>
> >>>>> Xintong Song
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sat, Dec 21, 2019 at 1:00 AM Till Rohrmann <trohrm...@apache.org>
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks for the feedback and good discussion everyone. I left some
> >>>>>> comments inline.
> >>>>>>
> >>>>>> On Fri, Dec 20, 2019 at 1:59 PM Stephan Ewen <se...@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> +1 to not have more options for off-heap memory, that would get
> >>>>>>> confusing fast. We can keep the name "off-heap" for both task and
> >>>>>>> framework, as long as they mean the same thing: native plus
> direct, and
> >>>>>>> fully counted into MaxDirectMemory. I would suggest to update the
> config
> >>>>>>> descriptions then to reflect that.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Dec 20, 2019 at 1:03 PM Xintong Song <
> tonysong...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Regarding the framework/task direct/native memory options, I tend
> >>>>>>>> to think it differently. I'm in favor of keep the
> "*.off-heap.size" for the
> >>>>>>>> config option keys.
> >>>>>>>>
> >>>>>>>>    - It's not necessary IMO to expose the difference concepts of
> >>>>>>>>    direct / native memory to the users.
> >>>>>>>>    - I would avoid introducing more options for native memory if
> >>>>>>>>    possible. Taking fine grained resource management and dynamic
> slot
> >>>>>>>>    allocation into consideration, that also means introduce more
> fields into
> >>>>>>>>    ResourceSpec / ResourceProfile.
> >>>>>>>>    - My gut feeling is that having a relative loose
> >>>>>>>>    MaxDirectMemory should not be a big problem.
> >>>>>>>>    - In most cases, the task / framework off-heap memory should be
> >>>>>>>>       mainly (if not all) direct memory, so the difference
> between derived
> >>>>>>>>       MaxDirectMemory and the ideal direct memory limit should
> not be too much.
> >>>>>>>>       - We do not have a good way to know the exact size needed
> >>>>>>>>       for jvm overhead / metaspace and framework / task off-heap
> memory, thus
> >>>>>>>>       having to conservatively reserve slightly more memory then
> what actually
> >>>>>>>>       needed. Such reserved but not used memory can cover for the
> small
> >>>>>>>>       MaxDirectMemory error.
> >>>>>>>>       -
> >>>>>>>>       - MaxDirectMemory is not the only way to trigger full gc. We
> >>>>>>>>       still heap activities that can also trigger the gc.
> >>>>>>>>
> >>>>>>>> Regarding the memory type config options, I've looked into the
> >>>>>>>> latest ConfigOptions changes. I think it shouldn't be too
> complicated to
> >>>>>>>> change the config options to use memory type, and I can handle it
> maybe
> >>>>>>>> during your vacations.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Also agree with improving MemorySize logging and parsing. This
> >>>>>>>> should not be a blocker that has to be done in 1.10. I would say
> we finish
> >>>>>>>> other works (testability, documentation and those discussed in
> this thread)
> >>>>>>>> first, and get to this only if we have time.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thank you~
> >>>>>>>>
> >>>>>>>> Xintong Song
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Dec 20, 2019 at 6:07 PM Andrey Zagrebin <
> >>>>>>>> azagrebin.apa...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Stephan and Xintong,
> >>>>>>>>>
> >>>>>>>>> Thanks for the further FLIP-49 feedbacks.
> >>>>>>>>>
> >>>>>>>>>   - "taskmanager.memory.size" (old main config option) is
> replaced
> >>>>>>>>>> by "taskmanager.memory.total-process.size" which has a
> different meaning in
> >>>>>>>>>> standalone setups. The old option did not subtract metaspace
> and other
> >>>>>>>>>> overhead, while the new option does. That means that with the
> default
> >>>>>>>>>> config, standalone clusters get quite a bit less memory.
> (independent of
> >>>>>>>>>> managed memory going off heap).
> >>>>>>>>>>     I am wondering if we could interpret
> >>>>>>>>>> "taskmanager.memory.size" as the deprecated key for
> >>>>>>>>>> "taskmanager.memory.total-flink.size". That would be in line
> with the old
> >>>>>>>>>> mechanism (assuming managed memory is set to off heap).
> >>>>>>>>>>     The effect would be that the container size on Yarn/Mesos
> >>>>>>>>>> increases, because from "taskmanager.memory.total-flink.size",
> we need to
> >>>>>>>>>> add overhead and metaspace to reach the total process size,
> rather than
> >>>>>>>>>> cutting off memory. But if we want, we could even adjust for
> that in the
> >>>>>>>>>> active resource manager, getting full backwards compatibility
> on that part.
> >>>>>>>>>>     Curious to hear more thoughts there.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I believe you mean "taskmanager.heap.size".
> >>>>>>>>>
> >>>>>>>>> I think the problem here is that the
> >>>>>>>>> legacy "taskmanager.heap.size" was used differently in
> standalone setups
> >>>>>>>>> and active yarn / mesos setups, and such different calculation
> logics and
> >>>>>>>>> behaviors are exactly what we want to avoid with FLIP-49.
> Therefore, I'm
> >>>>>>>>> not in favor of treating "taskmanager.memory.total-flink.size"
> differently
> >>>>>>>>> for standalone and active setups.
> >>>>>>>>>
> >>>>>>>>> I think what we really want is probably
> >>>>>>>>> mapping "taskmanager.heap.size" to different new config options
> in
> >>>>>>>>> different setups. How about we mark "taskmanager.heap.size" as
> deprecated
> >>>>>>>>> key for neither of "taskmanager.memory.total-process.size" and
> >>>>>>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if
> explicitly
> >>>>>>>>> configured) in startup scripts / active resource managers, and
> set the
> >>>>>>>>> value to "taskmanager.memory.total-flink.size" in the scripts and
> >>>>>>>>> "taskmanager.memory.total-process.size" in active resource
> managers (if the
> >>>>>>>>> new config options are not configured). We can provide util
> methods in
> >>>>>>>>> TaskExecutorResourceUtils for such conversions, to keep all the
> >>>>>>>>> configuration logics at one place.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I agree that the problem is that the legacy option
> >>>>>>>>> ‘taskmanager.heap.size’ has different semantics for
> standalone/container.
> >>>>>>>>> We had it initially falling back to
> 'taskmanager.memory.total-flink.size’
> >>>>>>>>> but I changed that to align it with container cut-off. Now I see
> it changes
> >>>>>>>>> standalone setup then.
> >>>>>>>>> +1 for supporting its backwards compatibility differently for
> >>>>>>>>> standalone/container setups.
> >>>>>>>>>
> >>>>>>>>
> >>>>>> Which configuration option will be set in Flink's default
> >>>>>> flink-conf.yaml? If we want to maintain the existing behaviour it
> would
> >>>>>> have to be the then deprecated taskmanager.heap.size config option.
> If we
> >>>>>> are ok with Yarn requesting slightly larger containers, then it
> could also
> >>>>>> be taskmanager.memory.total-flink.size.
> >>>>>>
> >>>>>>>
> >>>>>>>>>
> >>>>>>>>>   - Mini Cluster tries to imitate exact ratio of memory pools as
> a
> >>>>>>>>>> standalone setup. I get the idea behind that, but I am
> wondering if it is
> >>>>>>>>>> the right approach here.
> >>>>>>>>>>     For example: I started a relatively large JVM (large heap
> >>>>>>>>>> size of 10 GB) as a test. With the current logic, the system
> tries to
> >>>>>>>>>> reserve an additional 6GB for managed memory which is more than
> there is
> >>>>>>>>>> memory left. When you see the error that no memory could be
> allocated, you
> >>>>>>>>>> need to understand the magic of how this is derived.
> >>>>>>>>>>     I am trying to think about this from the perspective of
> using
> >>>>>>>>>> "Flink as a Library", which the MiniCluster is close to.
> >>>>>>>>>>     When starting Flink out of a running process, we cannot
> >>>>>>>>>> assume that we are the only users of that process and that we
> can mold the
> >>>>>>>>>> process to our demands. I think a fix value for managed memory
> and network
> >>>>>>>>>> memory would feel more natural in such a setup than a mechanism
> that is
> >>>>>>>>>> tailored towards exclusive use of the process.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> +1 on having fixed values for managed / shuffle memory.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> also +1 for that, if user has not specified any main options to
> >>>>>>>>> derive memory. We should also log this fixing of managed /
> shuffle memory.
> >>>>>>>>> And just noticed, we could also sanity check framework and if
> >>>>>>>>> explicitly configured task heap against available JVM heap, and
> at least
> >>>>>>>>> log inconsistencies.
> >>>>>>>>>
> >>>>>>>>>   - Some off-heap memory goes into direct memory, some does not.
> >>>>>>>>>> This confused me a bit. For example
> >>>>>>>>>> "taskmanager.memory.framework.off-heap.size" is counted into
> >>>>>>>>>> MaxDirectMemory while "taskmanager.memory.task.off-heap.size"
> is counted as
> >>>>>>>>>> native memory. Maybe we should rename the keys to reflect that.
> There is no
> >>>>>>>>>> one "off heap" memory type after all. Maybe use
> >>>>>>>>>> "taskmanager.memory.task.native: XXXmb" and
> >>>>>>>>>> "taskmanager.memory.framework.direct: XXXmb" instead?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I believe "taskmanager.memory.task.off-heap.size" is also
> >>>>>>>>> accounted in the max direct memory size limit. The confusion
> probably comes
> >>>>>>>>> from that "taskmanager.memory.framework.off-heap.size"
> explicitly mentioned
> >>>>>>>>> that in its description while
> "taskmanager.memory.task.off-heap.size"
> >>>>>>>>> didn't. That's actually because the framework off-heap memory is
> introduced
> >>>>>>>>> later in a separate commit. We should fix that.
> >>>>>>>>>
> >>>>>>>>> For framework / task off-heap memory, we do not differentiate
> >>>>>>>>> direct / native memory usage. That means the configure value for
> these two
> >>>>>>>>> options could be a mixture of direct / native memory. Since we
> do not know
> >>>>>>>>> the portion of direct memory out of the configured value, we have
> >>>>>>>>> to conservatively account it all into the max direct memory size
> limit.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==>  In that case, I am a bit confused. For the total size
> >>>>>>>>> calculation, it is fine. But why do we then set MaxDirectMemory?
> It is a
> >>>>>>>>> difficult parameter, and the main reason to set it was (if I
> recall
> >>>>>>>>> correctly) to trigger GC based on direct memory allocation (to
> free heap
> >>>>>>>>> structures that then in turn release direct memory). If the
> limit is
> >>>>>>>>> anyways too high (because we also count native memory in there)
> such that
> >>>>>>>>> we can exceed the total process (container) memory, why do we
> set it then?*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I always also thought about it as providing more safety net for
> >>>>>>>>> direct allocations but GC thing looks more important.
> >>>>>>>>>
> >>>>>>>>> +1 for fixing docs for 'taskmanager.memory.task.off-heap.size’
> and
> >>>>>>>>> renaming to ‘direct' as this is what really happens
> >>>>>>>>> if we want to support direct limit more exact than now.
> >>>>>>>>>
> >>>>>>>>> I also think that it is hard to separate direct / native memory
> >>>>>>>>> unless we introduce even more options.
> >>>>>>>>> If user wants to keep the direct limit tight to a certain value
> >>>>>>>>> but also use native memory outside of it,
> >>>>>>>>> she would have to increase something else, like JVM overhead to
> >>>>>>>>> account for it and there is no other better way.
> >>>>>>>>> Having more options to account for the native memory outside of
> >>>>>>>>> direct limit complicates things but can be introduced later if
> needed.
> >>>>>>>>>
> >>>>>>>>>   - What do you think about renaming
> >>>>>>>>>> "taskmanager.memory.total-flink.size" to
> "taskmanager.memory.flink.size"
> >>>>>>>>>> and "taskmanager.memory.total-process.size" to
> >>>>>>>>>> "taskmanager.memory.process.size" (or
> "taskmanager.memory.jvm.total"). I
> >>>>>>>>>> think these keys may be a bit less clumsy (dropping the
> "total-") without
> >>>>>>>>>> loss of expressiveness.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> +1 on this.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> +1 as well. Also an option:
> >>>>>>>>> 'taskmanager.memory.total-process.size’ ->
> >>>>>>>>> ‘taskmanager.memory.jvm.process.size’,
> >>>>>>>>> although it can be also mentioned in docs that we mean JVM
> process.
> >>>>>>>>>
> >>>>>>>>
> >>>>>> I'd be in favour of Stephan's proposal for the config keys as
> shorter
> >>>>>> is usually better and they are still descriptive enough. Between
> >>>>>> "taskmanager.memory.process.size" and
> "taskmanager.memory.jvm.total" I
> >>>>>> would slightly favour the first variant.
> >>>>>>
> >>>>>>>
> >>>>>>>>>   - The network memory keys are now called
> >>>>>>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is
> usually
> >>>>>>>>>> understood as a redistribution (random, or maybe by hash of
> key). As an
> >>>>>>>>>> example, there are many discussions about "shuffle join versus
> broadcast
> >>>>>>>>>> join", where "shuffle" is the synonym for "re-partitioning". We
> use that
> >>>>>>>>>> memory however for all network operations, like forward pipes,
> broadcasts,
> >>>>>>>>>> receiver-side buffering on checkpoints, etc. I find the name
> "*.shuffle.*"
> >>>>>>>>>> confusing, I am wondering if users would find that as well. So
> throwing in
> >>>>>>>>>> the suggestion to call the options
> "taskmanager.memory.network.*".
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On
> the
> >>>>>>>>> other hand, one can also argue that this part of memory is used
> by
> >>>>>>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*"
> points more
> >>>>>>>>> directly to the shuffle service components.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==> In that case, the name "Shuffle Environment" may be a bit
> >>>>>>>>> incorrect, because it is doing not only shuffles as well. The
> >>>>>>>>> ShuffleEnvironment is also more internal, so the name is not too
> critical.
> >>>>>>>>> This isn't super high priority for me, but if we want to adjust
> it, better
> >>>>>>>>> earlier than later.*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> This is also a bit controversial topic for me. Indeed, we have
> >>>>>>>>> always used ’network’ for this concept of task data shuffling
> over the
> >>>>>>>>> network and this can confuse existing users.
> >>>>>>>>>
> >>>>>>>>> On the other hand for the new users and in a long term, ’network’
> >>>>>>>>> can delude into a conclusion that all network memory is managed
> by this
> >>>>>>>>> option.
> >>>>>>>>> Also other types of shuffle might not directly deal with network
> >>>>>>>>> at all.
> >>>>>>>>>
> >>>>>>>>> By calling it shuffle, we were somewhat biased by understanding
> it
> >>>>>>>>> in term of map/reduce. This is rather an inter-task data
> exchange.
> >>>>>>>>> Maybe then 'taskmanager.memory.shuffle.communication.*’ or
> >>>>>>>>> ‘taskmanager.memory.task.shuffle/communication/io/network.*’.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>  From a user's perspective I believe "taskmanager.memory.network"
> >>>>>> would be easier to understand as not everyone knows exactly what the
> >>>>>> shuffle service is. I see the point that it would be a bit
> imprecise as we
> >>>>>> can have different shuffle implementations but I would go with the
> ease of
> >>>>>> use/understanding here. Moreover, I think that we won't have many
> different
> >>>>>> shuffle service implementations in the foreseeable future.
> >>>>>>
> >>>>>>>
> >>>>>>>>>   - The descriptions for the "taskmanager.memory.jvm-overhead.*"
> >>>>>>>>>> keys say that it also accounts for I/O direct memory, but the
> parameter is
> >>>>>>>>>> not counted into the MaxDirectMemory parameter.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> True. Since we already have framework off-heap memory accounted
> >>>>>>>>> for ad hoc direct memory usages, accounting all of jvm-overhead
> also into
> >>>>>>>>> max direct memory limit seems not necessary. I would suggest to
> remove "I/O
> >>>>>>>>> direct memory" from the description, and explicitly mention that
> this
> >>>>>>>>> option does not account for direct memory and will not be
> accounted into
> >>>>>>>>> max direct memory limit. WDYT?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> +1
> >>>>>>>>>
> >>>>>>>>>   - Can make the new ConfigOptions strongly typed with the new
> >>>>>>>>>> configuration options. For example, directly use MemorySize
> typed options.
> >>>>>>>>>> That makes validation automatic and saves us from breaking the
> options
> >>>>>>>>>> later.
> >>>>>>>>>
> >>>>>>>>> +1. Wasn't aware of the new memory type config options.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> +1
> >>>>>>>>>
> >>>>>>>>> *==> Thanks. Do you need help with adjusting this?*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I would appreciate it.
> >>>>>>>>>
> >>>>>>>>> Also small side note, we extensively use MemorySize in logs now
> >>>>>>>>> but it might be not always readable as its string representation
> is only in
> >>>>>>>>> bytes atm
> >>>>>>>>> and does not reduce it to kb/mb/etc in case of big bytes value.
> We
> >>>>>>>>> could have at least some .prettyPrint function to use in logs.
> >>>>>>>>> And .fromMegabytes/etc factory methods would improve code
> >>>>>>>>> readability instead of .parse(int + “m”).
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Andrey
> >>>>>>>>>
> >>>>>>>>> On 20 Dec 2019, at 12:13, Stephan Ewen <se...@apache.org> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Xintong!
> >>>>>>>>>
> >>>>>>>>> Please find my answers inline:
> >>>>>>>>>
> >>>>>>>>>>   - "taskmanager.memory.size" (old main config option) is
> >>>>>>>>>>> replaced by "taskmanager.memory.total-process.size" which has
> a different
> >>>>>>>>>>> meaning in standalone setups. The old option did not subtract
> metaspace and
> >>>>>>>>>>> other overhead, while the new option does. That means that
> with the default
> >>>>>>>>>>> config, standalone clusters get quite a bit less memory.
> (independent of
> >>>>>>>>>>> managed memory going off heap).
> >>>>>>>>>>>     I am wondering if we could interpret
> >>>>>>>>>>> "taskmanager.memory.size" as the deprecated key for
> >>>>>>>>>>> "taskmanager.memory.total-flink.size". That would be in line
> with the old
> >>>>>>>>>>> mechanism (assuming managed memory is set to off heap).
> >>>>>>>>>>>     The effect would be that the container size on Yarn/Mesos
> >>>>>>>>>>> increases, because from "taskmanager.memory.total-flink.size",
> we need to
> >>>>>>>>>>> add overhead and metaspace to reach the total process size,
> rather than
> >>>>>>>>>>> cutting off memory. But if we want, we could even adjust for
> that in the
> >>>>>>>>>>> active resource manager, getting full backwards compatibility
> on that part.
> >>>>>>>>>>>     Curious to hear more thoughts there.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I believe you mean "taskmanager.heap.size".
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==> Yes*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> I think the problem here is that the
> >>>>>>>>>> legacy "taskmanager.heap.size" was used differently in
> standalone setups
> >>>>>>>>>> and active yarn / mesos setups, and such different calculation
> logics and
> >>>>>>>>>> behaviors are exactly what we want to avoid with FLIP-49.
> Therefore, I'm
> >>>>>>>>>> not in favor of treating "taskmanager.memory.total-flink.size"
> differently
> >>>>>>>>>> for standalone and active setups.
> >>>>>>>>>>
> >>>>>>>>>> I think what we really want is probably
> >>>>>>>>>> mapping "taskmanager.heap.size" to different new config options
> in
> >>>>>>>>>> different setups. How about we mark "taskmanager.heap.size" as
> deprecated
> >>>>>>>>>> key for neither of "taskmanager.memory.total-process.size" and
> >>>>>>>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if
> explicitly
> >>>>>>>>>> configured) in startup scripts / active resource managers, and
> set the
> >>>>>>>>>> value to "taskmanager.memory.total-flink.size" in the scripts
> and
> >>>>>>>>>> "taskmanager.memory.total-process.size" in active resource
> managers (if the
> >>>>>>>>>> new config options are not configured). We can provide util
> methods in
> >>>>>>>>>> TaskExecutorResourceUtils for such conversions, to keep all the
> >>>>>>>>>> configuration logics at one place.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==> This is pretty much what I meant as well (maybe my
> >>>>>>>>> description was not very clear), so +1 for that mechanism*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>   - Mini Cluster tries to imitate exact ratio of memory pools as
> a
> >>>>>>>>>>> standalone setup. I get the idea behind that, but I am
> wondering if it is
> >>>>>>>>>>> the right approach here.
> >>>>>>>>>>>     For example: I started a relatively large JVM (large heap
> >>>>>>>>>>> size of 10 GB) as a test. With the current logic, the system
> tries to
> >>>>>>>>>>> reserve an additional 6GB for managed memory which is more
> than there is
> >>>>>>>>>>> memory left. When you see the error that no memory could be
> allocated, you
> >>>>>>>>>>> need to understand the magic of how this is derived.
> >>>>>>>>>>>     I am trying to think about this from the perspective of
> >>>>>>>>>>> using "Flink as a Library", which the MiniCluster is close to.
> >>>>>>>>>>>     When starting Flink out of a running process, we cannot
> >>>>>>>>>>> assume that we are the only users of that process and that we
> can mold the
> >>>>>>>>>>> process to our demands. I think a fix value for managed memory
> and network
> >>>>>>>>>>> memory would feel more natural in such a setup than a
> mechanism that is
> >>>>>>>>>>> tailored towards exclusive use of the process.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> +1 on having fixed values for managed / shuffle memory.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==> Cool, let's also see what Andrey and Till think here.*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>>   - Some off-heap memory goes into direct memory, some does not.
> >>>>>>>>>>> This confused me a bit. For example
> >>>>>>>>>>> "taskmanager.memory.framework.off-heap.size" is counted into
> >>>>>>>>>>> MaxDirectMemory while "taskmanager.memory.task.off-heap.size"
> is counted as
> >>>>>>>>>>> native memory. Maybe we should rename the keys to reflect
> that. There is no
> >>>>>>>>>>> one "off heap" memory type after all. Maybe use
> >>>>>>>>>>> "taskmanager.memory.task.native: XXXmb" and
> >>>>>>>>>>> "taskmanager.memory.framework.direct: XXXmb" instead?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I believe "taskmanager.memory.task.off-heap.size" is also
> >>>>>>>>>> accounted in the max direct memory size limit. The confusion
> probably comes
> >>>>>>>>>> from that "taskmanager.memory.framework.off-heap.size"
> explicitly mentioned
> >>>>>>>>>> that in its description while
> "taskmanager.memory.task.off-heap.size"
> >>>>>>>>>> didn't. That's actually because the framework off-heap memory
> is introduced
> >>>>>>>>>> later in a separate commit. We should fix that.
> >>>>>>>>>>
> >>>>>>>>>> For framework / task off-heap memory, we do not differentiate
> >>>>>>>>>> direct / native memory usage. That means the configure value
> for these two
> >>>>>>>>>> options could be a mixture of direct / native memory. Since we
> do not know
> >>>>>>>>>> the portion of direct memory out of the configured value, we
> have
> >>>>>>>>>> to conservatively account it all into the max direct memory
> size limit.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==>  In that case, I am a bit confused. For the total size
> >>>>>>>>> calculation, it is fine. But why do we then set MaxDirectMemory?
> It is a
> >>>>>>>>> difficult parameter, and the main reason to set it was (if I
> recall
> >>>>>>>>> correctly) to trigger GC based on direct memory allocation (to
> free heap
> >>>>>>>>> structures that then in turn release direct memory). If the
> limit is
> >>>>>>>>> anyways too high (because we also count native memory in there)
> such that
> >>>>>>>>> we can exceed the total process (container) memory, why do we
> set it then?*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>   - The network memory keys are now called
> >>>>>>>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is
> usually
> >>>>>>>>>>> understood as a redistribution (random, or maybe by hash of
> key). As an
> >>>>>>>>>>> example, there are many discussions about "shuffle join versus
> broadcast
> >>>>>>>>>>> join", where "shuffle" is the synonym for "re-partitioning".
> We use that
> >>>>>>>>>>> memory however for all network operations, like forward pipes,
> broadcasts,
> >>>>>>>>>>> receiver-side buffering on checkpoints, etc. I find the name
> "*.shuffle.*"
> >>>>>>>>>>> confusing, I am wondering if users would find that as well. So
> throwing in
> >>>>>>>>>>> the suggestion to call the options
> "taskmanager.memory.network.*".
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On
> >>>>>>>>>> the other hand, one can also argue that this part of memory is
> used by
> >>>>>>>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*"
> points more
> >>>>>>>>>> directly to the shuffle service components.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==> In that case, the name "Shuffle Environment" may be a bit
> >>>>>>>>> incorrect, because it is doing not only shuffles as well. The
> >>>>>>>>> ShuffleEnvironment is also more internal, so the name is not too
> critical.
> >>>>>>>>> This isn't super high priority for me, but if we want to adjust
> it, better
> >>>>>>>>> earlier than later.*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>    - The descriptions for the "taskmanager.memory.jvm-overhead.*"
> >>>>>>>>> keys say that it also accounts for I/O direct memory, but the
> parameter is
> >>>>>>>>> not counted into the MaxDirectMemory parameter.
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> True. Since we already have framework off-heap memory accounted
> >>>>>>>>>> for ad hoc direct memory usages, accounting all of jvm-overhead
> also into
> >>>>>>>>>> max direct memory limit seems not necessary. I would suggest to
> remove "I/O
> >>>>>>>>>> direct memory" from the description, and explicitly mention
> that this
> >>>>>>>>>> option does not account for direct memory and will not be
> accounted into
> >>>>>>>>>> max direct memory limit. WDYT?
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==> Sounds good. *
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>>   - Can make the new ConfigOptions strongly typed with the new
> >>>>>>>>>>> configuration options. For example, directly use MemorySize
> typed options.
> >>>>>>>>>>> That makes validation automatic and saves us from breaking the
> options
> >>>>>>>>>>> later.
> >>>>>>>>>>
> >>>>>>>>>> +1. Wasn't aware of the new memory type config options.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> *==> Thanks. Do you need help with adjusting this?*
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Stephan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
>

Reply via email to