>
> +1 to not have more options for off-heap memory, that would get confusing
> fast. We can keep the name "off-heap" for both task and framework, as long
> as they mean the same thing: native plus direct, and fully counted into
> MaxDirectMemory. I would suggest to update the config descriptions then to
> reflect that.
>
True, this should be explained in the config descriptions.

Which configuration option will be set in Flink's default flink-conf.yaml?
> If we want to maintain the existing behaviour it would have to be the then
> deprecated taskmanager.heap.size config option. If we are ok with Yarn
> requesting slightly larger containers, then it could also
> be taskmanager.memory.total-flink.size.
>
Good point. Currently, we have "taskmanager.memory.total-process.size". In
order to preserve the previous behavior, we need to have
"taskmanager.heap.size" so it can be mapped to different new options in
standalone / active setups.
I think we can have the deprecated "taskmanager.heap.size" in the default
flink-conf.yaml, and also have the
new "taskmanager.memory.total-process.size" in a commented line. We can
explain how the deprecated config option behaves differently in the
comments, so that user can switch to the new config options if they want to.

Thank you~

Xintong Song



On Sat, Dec 21, 2019 at 1:00 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Thanks for the feedback and good discussion everyone. I left some comments
> inline.
>
> On Fri, Dec 20, 2019 at 1:59 PM Stephan Ewen <se...@apache.org> wrote:
>
>> +1 to not have more options for off-heap memory, that would get confusing
>> fast. We can keep the name "off-heap" for both task and framework, as long
>> as they mean the same thing: native plus direct, and fully counted into
>> MaxDirectMemory. I would suggest to update the config descriptions then to
>> reflect that.
>>
>>
>>
>> On Fri, Dec 20, 2019 at 1:03 PM Xintong Song <tonysong...@gmail.com>
>> wrote:
>>
>>> Regarding the framework/task direct/native memory options, I tend to
>>> think it differently. I'm in favor of keep the "*.off-heap.size" for the
>>> config option keys.
>>>
>>>    - It's not necessary IMO to expose the difference concepts of direct
>>>    / native memory to the users.
>>>    - I would avoid introducing more options for native memory if
>>>    possible. Taking fine grained resource management and dynamic slot
>>>    allocation into consideration, that also means introduce more fields into
>>>    ResourceSpec / ResourceProfile.
>>>    - My gut feeling is that having a relative loose MaxDirectMemory
>>>    should not be a big problem.
>>>    - In most cases, the task / framework off-heap memory should be
>>>       mainly (if not all) direct memory, so the difference between derived
>>>       MaxDirectMemory and the ideal direct memory limit should not be too 
>>> much.
>>>       - We do not have a good way to know the exact size needed for jvm
>>>       overhead / metaspace and framework / task off-heap memory, thus having
>>>       to conservatively reserve slightly more memory then what actually 
>>> needed.
>>>       Such reserved but not used memory can cover for the small 
>>> MaxDirectMemory
>>>       error.
>>>       -
>>>       - MaxDirectMemory is not the only way to trigger full gc. We
>>>       still heap activities that can also trigger the gc.
>>>
>>> Regarding the memory type config options, I've looked into the latest
>>> ConfigOptions changes. I think it shouldn't be too complicated to change
>>> the config options to use memory type, and I can handle it maybe during
>>> your vacations.
>>>
>>>
>>> Also agree with improving MemorySize logging and parsing. This should
>>> not be a blocker that has to be done in 1.10. I would say we finish other
>>> works (testability, documentation and those discussed in this thread)
>>> first, and get to this only if we have time.
>>>
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Fri, Dec 20, 2019 at 6:07 PM Andrey Zagrebin <
>>> azagrebin.apa...@gmail.com> wrote:
>>>
>>>> Hi Stephan and Xintong,
>>>>
>>>> Thanks for the further FLIP-49 feedbacks.
>>>>
>>>>   - "taskmanager.memory.size" (old main config option) is replaced by
>>>>> "taskmanager.memory.total-process.size" which has a different meaning in
>>>>> standalone setups. The old option did not subtract metaspace and other
>>>>> overhead, while the new option does. That means that with the default
>>>>> config, standalone clusters get quite a bit less memory. (independent of
>>>>> managed memory going off heap).
>>>>>     I am wondering if we could interpret "taskmanager.memory.size" as
>>>>> the deprecated key for "taskmanager.memory.total-flink.size". That would 
>>>>> be
>>>>> in line with the old mechanism (assuming managed memory is set to off 
>>>>> heap).
>>>>>     The effect would be that the container size on Yarn/Mesos
>>>>> increases, because from "taskmanager.memory.total-flink.size", we need to
>>>>> add overhead and metaspace to reach the total process size, rather than
>>>>> cutting off memory. But if we want, we could even adjust for that in the
>>>>> active resource manager, getting full backwards compatibility on that 
>>>>> part.
>>>>>     Curious to hear more thoughts there.
>>>>
>>>>
>>>> I believe you mean "taskmanager.heap.size".
>>>>
>>>> I think the problem here is that the legacy "taskmanager.heap.size" was
>>>> used differently in standalone setups and active yarn / mesos setups, and
>>>> such different calculation logics and behaviors are exactly what we want to
>>>> avoid with FLIP-49. Therefore, I'm not in favor of treating
>>>> "taskmanager.memory.total-flink.size" differently for standalone and active
>>>> setups.
>>>>
>>>> I think what we really want is probably mapping "taskmanager.heap.size"
>>>> to different new config options in different setups. How about we
>>>> mark "taskmanager.heap.size" as deprecated key for neither of
>>>> "taskmanager.memory.total-process.size" and
>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly
>>>> configured) in startup scripts / active resource managers, and set the
>>>> value to "taskmanager.memory.total-flink.size" in the scripts and
>>>> "taskmanager.memory.total-process.size" in active resource managers (if the
>>>> new config options are not configured). We can provide util methods in
>>>> TaskExecutorResourceUtils for such conversions, to keep all the
>>>> configuration logics at one place.
>>>>
>>>>
>>>> I agree that the problem is that the legacy option
>>>> ‘taskmanager.heap.size’ has different semantics for standalone/container.
>>>> We had it initially falling back to 'taskmanager.memory.total-flink.size’
>>>> but I changed that to align it with container cut-off. Now I see it changes
>>>> standalone setup then.
>>>> +1 for supporting its backwards compatibility differently for
>>>> standalone/container setups.
>>>>
>>>
> Which configuration option will be set in Flink's default flink-conf.yaml?
> If we want to maintain the existing behaviour it would have to be the then
> deprecated taskmanager.heap.size config option. If we are ok with Yarn
> requesting slightly larger containers, then it could also
> be taskmanager.memory.total-flink.size.
>
>>
>>>>
>>>>   - Mini Cluster tries to imitate exact ratio of memory pools as a
>>>>> standalone setup. I get the idea behind that, but I am wondering if it is
>>>>> the right approach here.
>>>>>     For example: I started a relatively large JVM (large heap size of
>>>>> 10 GB) as a test. With the current logic, the system tries to reserve an
>>>>> additional 6GB for managed memory which is more than there is memory left.
>>>>> When you see the error that no memory could be allocated, you need to
>>>>> understand the magic of how this is derived.
>>>>>     I am trying to think about this from the perspective of using
>>>>> "Flink as a Library", which the MiniCluster is close to.
>>>>>     When starting Flink out of a running process, we cannot assume
>>>>> that we are the only users of that process and that we can mold the 
>>>>> process
>>>>> to our demands. I think a fix value for managed memory and network memory
>>>>> would feel more natural in such a setup than a mechanism that is tailored
>>>>> towards exclusive use of the process.
>>>>
>>>>
>>>> +1 on having fixed values for managed / shuffle memory.
>>>>
>>>>
>>>> also +1 for that, if user has not specified any main options to derive
>>>> memory. We should also log this fixing of managed / shuffle memory.
>>>> And just noticed, we could also sanity check framework and if
>>>> explicitly configured task heap against available JVM heap, and at least
>>>> log inconsistencies.
>>>>
>>>>   - Some off-heap memory goes into direct memory, some does not. This
>>>>> confused me a bit. For example 
>>>>> "taskmanager.memory.framework.off-heap.size"
>>>>> is counted into MaxDirectMemory while
>>>>> "taskmanager.memory.task.off-heap.size" is counted as native memory. Maybe
>>>>> we should rename the keys to reflect that. There is no one "off heap"
>>>>> memory type after all. Maybe use "taskmanager.memory.task.native: XXXmb"
>>>>> and "taskmanager.memory.framework.direct: XXXmb" instead?
>>>>
>>>>
>>>> I believe "taskmanager.memory.task.off-heap.size" is also accounted in
>>>> the max direct memory size limit. The confusion probably comes from that
>>>> "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in
>>>> its description while "taskmanager.memory.task.off-heap.size" didn't.
>>>> That's actually because the framework off-heap memory is introduced later
>>>> in a separate commit. We should fix that.
>>>>
>>>> For framework / task off-heap memory, we do not differentiate direct /
>>>> native memory usage. That means the configure value for these two options
>>>> could be a mixture of direct / native memory. Since we do not know the
>>>> portion of direct memory out of the configured value, we have
>>>> to conservatively account it all into the max direct memory size limit.
>>>>
>>>>
>>>> *==>  In that case, I am a bit confused. For the total size
>>>> calculation, it is fine. But why do we then set MaxDirectMemory? It is a
>>>> difficult parameter, and the main reason to set it was (if I recall
>>>> correctly) to trigger GC based on direct memory allocation (to free heap
>>>> structures that then in turn release direct memory). If the limit is
>>>> anyways too high (because we also count native memory in there) such that
>>>> we can exceed the total process (container) memory, why do we set it then?*
>>>>
>>>>
>>>> I always also thought about it as providing more safety net for direct
>>>> allocations but GC thing looks more important.
>>>>
>>>> +1 for fixing docs for 'taskmanager.memory.task.off-heap.size’ and
>>>> renaming to ‘direct' as this is what really happens
>>>> if we want to support direct limit more exact than now.
>>>>
>>>> I also think that it is hard to separate direct / native memory unless
>>>> we introduce even more options.
>>>> If user wants to keep the direct limit tight to a certain value but
>>>> also use native memory outside of it,
>>>> she would have to increase something else, like JVM overhead to account
>>>> for it and there is no other better way.
>>>> Having more options to account for the native memory outside of direct
>>>> limit complicates things but can be introduced later if needed.
>>>>
>>>>   - What do you think about renaming
>>>>> "taskmanager.memory.total-flink.size" to "taskmanager.memory.flink.size"
>>>>> and "taskmanager.memory.total-process.size" to
>>>>> "taskmanager.memory.process.size" (or "taskmanager.memory.jvm.total"). I
>>>>> think these keys may be a bit less clumsy (dropping the "total-") without
>>>>> loss of expressiveness.
>>>>
>>>>
>>>> +1 on this.
>>>>
>>>>
>>>> +1 as well. Also an option: 'taskmanager.memory.total-process.size’ ->
>>>> ‘taskmanager.memory.jvm.process.size’,
>>>> although it can be also mentioned in docs that we mean JVM process.
>>>>
>>>
> I'd be in favour of Stephan's proposal for the config keys as shorter is
> usually better and they are still descriptive enough. Between
> "taskmanager.memory.process.size" and "taskmanager.memory.jvm.total" I
> would slightly favour the first variant.
>
>>
>>>>   - The network memory keys are now called
>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually
>>>>> understood as a redistribution (random, or maybe by hash of key). As an
>>>>> example, there are many discussions about "shuffle join versus broadcast
>>>>> join", where "shuffle" is the synonym for "re-partitioning". We use that
>>>>> memory however for all network operations, like forward pipes, broadcasts,
>>>>> receiver-side buffering on checkpoints, etc. I find the name "*.shuffle.*"
>>>>> confusing, I am wondering if users would find that as well. So throwing in
>>>>> the suggestion to call the options "taskmanager.memory.network.*".
>>>>
>>>>
>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the
>>>> other hand, one can also argue that this part of memory is used by
>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more
>>>> directly to the shuffle service components.
>>>>
>>>>
>>>> *==> In that case, the name "Shuffle Environment" may be a bit
>>>> incorrect, because it is doing not only shuffles as well. The
>>>> ShuffleEnvironment is also more internal, so the name is not too critical.
>>>> This isn't super high priority for me, but if we want to adjust it, better
>>>> earlier than later.*
>>>>
>>>>
>>>> This is also a bit controversial topic for me. Indeed, we have always
>>>> used ’network’ for this concept of task data shuffling over the network and
>>>> this can confuse existing users.
>>>>
>>>> On the other hand for the new users and in a long term, ’network’ can
>>>> delude into a conclusion that all network memory is managed by this option.
>>>> Also other types of shuffle might not directly deal with network at all.
>>>>
>>>> By calling it shuffle, we were somewhat biased by understanding it in
>>>> term of map/reduce. This is rather an inter-task data exchange.
>>>> Maybe then 'taskmanager.memory.shuffle.communication.*’ or
>>>> ‘taskmanager.memory.task.shuffle/communication/io/network.*’.
>>>>
>>>
>  From a user's perspective I believe "taskmanager.memory.network" would be
> easier to understand as not everyone knows exactly what the shuffle service
> is. I see the point that it would be a bit imprecise as we can have
> different shuffle implementations but I would go with the ease of
> use/understanding here. Moreover, I think that we won't have many different
> shuffle service implementations in the foreseeable future.
>
>>
>>>>   - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys
>>>>> say that it also accounts for I/O direct memory, but the parameter is not
>>>>> counted into the MaxDirectMemory parameter.
>>>>
>>>>
>>>> True. Since we already have framework off-heap memory accounted for ad
>>>> hoc direct memory usages, accounting all of jvm-overhead also into max
>>>> direct memory limit seems not necessary. I would suggest to remove "I/O
>>>> direct memory" from the description, and explicitly mention that this
>>>> option does not account for direct memory and will not be accounted into
>>>> max direct memory limit. WDYT?
>>>>
>>>>
>>>> +1
>>>>
>>>>   - Can make the new ConfigOptions strongly typed with the new
>>>>> configuration options. For example, directly use MemorySize typed options.
>>>>> That makes validation automatic and saves us from breaking the options
>>>>> later.
>>>>
>>>> +1. Wasn't aware of the new memory type config options.
>>>>
>>>>
>>>> +1
>>>>
>>>> *==> Thanks. Do you need help with adjusting this?*
>>>>
>>>>
>>>> I would appreciate it.
>>>>
>>>> Also small side note, we extensively use MemorySize in logs now but it
>>>> might be not always readable as its string representation is only in bytes
>>>> atm
>>>> and does not reduce it to kb/mb/etc in case of big bytes value. We
>>>> could have at least some .prettyPrint function to use in logs.
>>>> And .fromMegabytes/etc factory methods would improve code readability
>>>> instead of .parse(int + “m”).
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> On 20 Dec 2019, at 12:13, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>> Hi Xintong!
>>>>
>>>> Please find my answers inline:
>>>>
>>>>>   - "taskmanager.memory.size" (old main config option) is replaced by
>>>>>> "taskmanager.memory.total-process.size" which has a different meaning in
>>>>>> standalone setups. The old option did not subtract metaspace and other
>>>>>> overhead, while the new option does. That means that with the default
>>>>>> config, standalone clusters get quite a bit less memory. (independent of
>>>>>> managed memory going off heap).
>>>>>>     I am wondering if we could interpret "taskmanager.memory.size" as
>>>>>> the deprecated key for "taskmanager.memory.total-flink.size". That would 
>>>>>> be
>>>>>> in line with the old mechanism (assuming managed memory is set to off 
>>>>>> heap).
>>>>>>     The effect would be that the container size on Yarn/Mesos
>>>>>> increases, because from "taskmanager.memory.total-flink.size", we need to
>>>>>> add overhead and metaspace to reach the total process size, rather than
>>>>>> cutting off memory. But if we want, we could even adjust for that in the
>>>>>> active resource manager, getting full backwards compatibility on that 
>>>>>> part.
>>>>>>     Curious to hear more thoughts there.
>>>>>
>>>>>
>>>>> I believe you mean "taskmanager.heap.size".
>>>>>
>>>>
>>>> *==> Yes*
>>>>
>>>>
>>>>
>>>>> I think the problem here is that the legacy "taskmanager.heap.size"
>>>>> was used differently in standalone setups and active yarn / mesos setups,
>>>>> and such different calculation logics and behaviors are exactly what we
>>>>> want to avoid with FLIP-49. Therefore, I'm not in favor of treating
>>>>> "taskmanager.memory.total-flink.size" differently for standalone and 
>>>>> active
>>>>> setups.
>>>>>
>>>>> I think what we really want is probably
>>>>> mapping "taskmanager.heap.size" to different new config options in
>>>>> different setups. How about we mark "taskmanager.heap.size" as deprecated
>>>>> key for neither of "taskmanager.memory.total-process.size" and
>>>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly
>>>>> configured) in startup scripts / active resource managers, and set the
>>>>> value to "taskmanager.memory.total-flink.size" in the scripts and
>>>>> "taskmanager.memory.total-process.size" in active resource managers (if 
>>>>> the
>>>>> new config options are not configured). We can provide util methods in
>>>>> TaskExecutorResourceUtils for such conversions, to keep all the
>>>>> configuration logics at one place.
>>>>>
>>>>
>>>> *==> This is pretty much what I meant as well (maybe my description was
>>>> not very clear), so +1 for that mechanism*
>>>>
>>>>
>>>>   - Mini Cluster tries to imitate exact ratio of memory pools as a
>>>>>> standalone setup. I get the idea behind that, but I am wondering if it is
>>>>>> the right approach here.
>>>>>>     For example: I started a relatively large JVM (large heap size of
>>>>>> 10 GB) as a test. With the current logic, the system tries to reserve an
>>>>>> additional 6GB for managed memory which is more than there is memory 
>>>>>> left.
>>>>>> When you see the error that no memory could be allocated, you need to
>>>>>> understand the magic of how this is derived.
>>>>>>     I am trying to think about this from the perspective of using
>>>>>> "Flink as a Library", which the MiniCluster is close to.
>>>>>>     When starting Flink out of a running process, we cannot assume
>>>>>> that we are the only users of that process and that we can mold the 
>>>>>> process
>>>>>> to our demands. I think a fix value for managed memory and network memory
>>>>>> would feel more natural in such a setup than a mechanism that is tailored
>>>>>> towards exclusive use of the process.
>>>>>
>>>>>
>>>>> +1 on having fixed values for managed / shuffle memory.
>>>>>
>>>>
>>>> *==> Cool, let's also see what Andrey and Till think here.*
>>>>
>>>>
>>>>
>>>>>   - Some off-heap memory goes into direct memory, some does not. This
>>>>>> confused me a bit. For example 
>>>>>> "taskmanager.memory.framework.off-heap.size"
>>>>>> is counted into MaxDirectMemory while
>>>>>> "taskmanager.memory.task.off-heap.size" is counted as native memory. 
>>>>>> Maybe
>>>>>> we should rename the keys to reflect that. There is no one "off heap"
>>>>>> memory type after all. Maybe use "taskmanager.memory.task.native: XXXmb"
>>>>>> and "taskmanager.memory.framework.direct: XXXmb" instead?
>>>>>
>>>>>
>>>>> I believe "taskmanager.memory.task.off-heap.size" is also accounted in
>>>>> the max direct memory size limit. The confusion probably comes from that
>>>>> "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in
>>>>> its description while "taskmanager.memory.task.off-heap.size" didn't.
>>>>> That's actually because the framework off-heap memory is introduced later
>>>>> in a separate commit. We should fix that.
>>>>>
>>>>> For framework / task off-heap memory, we do not differentiate direct /
>>>>> native memory usage. That means the configure value for these two options
>>>>> could be a mixture of direct / native memory. Since we do not know the
>>>>> portion of direct memory out of the configured value, we have
>>>>> to conservatively account it all into the max direct memory size limit.
>>>>>
>>>>
>>>> *==>  In that case, I am a bit confused. For the total size
>>>> calculation, it is fine. But why do we then set MaxDirectMemory? It is a
>>>> difficult parameter, and the main reason to set it was (if I recall
>>>> correctly) to trigger GC based on direct memory allocation (to free heap
>>>> structures that then in turn release direct memory). If the limit is
>>>> anyways too high (because we also count native memory in there) such that
>>>> we can exceed the total process (container) memory, why do we set it then?*
>>>>
>>>>
>>>>   - The network memory keys are now called
>>>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually
>>>>>> understood as a redistribution (random, or maybe by hash of key). As an
>>>>>> example, there are many discussions about "shuffle join versus broadcast
>>>>>> join", where "shuffle" is the synonym for "re-partitioning". We use that
>>>>>> memory however for all network operations, like forward pipes, 
>>>>>> broadcasts,
>>>>>> receiver-side buffering on checkpoints, etc. I find the name 
>>>>>> "*.shuffle.*"
>>>>>> confusing, I am wondering if users would find that as well. So throwing 
>>>>>> in
>>>>>> the suggestion to call the options "taskmanager.memory.network.*".
>>>>>
>>>>>
>>>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the
>>>>> other hand, one can also argue that this part of memory is used by
>>>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more
>>>>> directly to the shuffle service components.
>>>>>
>>>>
>>>> *==> In that case, the name "Shuffle Environment" may be a bit
>>>> incorrect, because it is doing not only shuffles as well. The
>>>> ShuffleEnvironment is also more internal, so the name is not too critical.
>>>> This isn't super high priority for me, but if we want to adjust it, better
>>>> earlier than later.*
>>>>
>>>>
>>>>    - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys
>>>> say that it also accounts for I/O direct memory, but the parameter is not
>>>> counted into the MaxDirectMemory parameter.
>>>>
>>>>>
>>>>> True. Since we already have framework off-heap memory accounted for ad
>>>>> hoc direct memory usages, accounting all of jvm-overhead also into max
>>>>> direct memory limit seems not necessary. I would suggest to remove "I/O
>>>>> direct memory" from the description, and explicitly mention that this
>>>>> option does not account for direct memory and will not be accounted into
>>>>> max direct memory limit. WDYT?
>>>>>
>>>>
>>>> *==> Sounds good. *
>>>>
>>>>
>>>>
>>>>>   - Can make the new ConfigOptions strongly typed with the new
>>>>>> configuration options. For example, directly use MemorySize typed 
>>>>>> options.
>>>>>> That makes validation automatic and saves us from breaking the options
>>>>>> later.
>>>>>
>>>>> +1. Wasn't aware of the new memory type config options.
>>>>>
>>>>
>>>> *==> Thanks. Do you need help with adjusting this?*
>>>>
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>>
>>>>

Reply via email to