+1 to not have more options for off-heap memory, that would get confusing
fast. We can keep the name "off-heap" for both task and framework, as long
as they mean the same thing: native plus direct, and fully counted into
MaxDirectMemory. I would suggest to update the config descriptions then to
reflect that.



On Fri, Dec 20, 2019 at 1:03 PM Xintong Song <tonysong...@gmail.com> wrote:

> Regarding the framework/task direct/native memory options, I tend to think
> it differently. I'm in favor of keep the "*.off-heap.size" for the config
> option keys.
>
>    - It's not necessary IMO to expose the difference concepts of direct /
>    native memory to the users.
>    - I would avoid introducing more options for native memory if
>    possible. Taking fine grained resource management and dynamic slot
>    allocation into consideration, that also means introduce more fields into
>    ResourceSpec / ResourceProfile.
>    - My gut feeling is that having a relative loose MaxDirectMemory
>    should not be a big problem.
>    - In most cases, the task / framework off-heap memory should be mainly
>       (if not all) direct memory, so the difference between derived
>       MaxDirectMemory and the ideal direct memory limit should not be too 
> much.
>       - We do not have a good way to know the exact size needed for jvm
>       overhead / metaspace and framework / task off-heap memory, thus having
>       to conservatively reserve slightly more memory then what actually 
> needed.
>       Such reserved but not used memory can cover for the small 
> MaxDirectMemory
>       error.
>       -
>       - MaxDirectMemory is not the only way to trigger full gc. We still
>       heap activities that can also trigger the gc.
>
> Regarding the memory type config options, I've looked into the latest
> ConfigOptions changes. I think it shouldn't be too complicated to change
> the config options to use memory type, and I can handle it maybe during
> your vacations.
>
>
> Also agree with improving MemorySize logging and parsing. This should not
> be a blocker that has to be done in 1.10. I would say we finish other works
> (testability, documentation and those discussed in this thread) first, and
> get to this only if we have time.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Dec 20, 2019 at 6:07 PM Andrey Zagrebin <
> azagrebin.apa...@gmail.com> wrote:
>
>> Hi Stephan and Xintong,
>>
>> Thanks for the further FLIP-49 feedbacks.
>>
>>   - "taskmanager.memory.size" (old main config option) is replaced by
>>> "taskmanager.memory.total-process.size" which has a different meaning in
>>> standalone setups. The old option did not subtract metaspace and other
>>> overhead, while the new option does. That means that with the default
>>> config, standalone clusters get quite a bit less memory. (independent of
>>> managed memory going off heap).
>>>     I am wondering if we could interpret "taskmanager.memory.size" as
>>> the deprecated key for "taskmanager.memory.total-flink.size". That would be
>>> in line with the old mechanism (assuming managed memory is set to off heap).
>>>     The effect would be that the container size on Yarn/Mesos increases,
>>> because from "taskmanager.memory.total-flink.size", we need to add overhead
>>> and metaspace to reach the total process size, rather than cutting off
>>> memory. But if we want, we could even adjust for that in the active
>>> resource manager, getting full backwards compatibility on that part.
>>>     Curious to hear more thoughts there.
>>
>>
>> I believe you mean "taskmanager.heap.size".
>>
>> I think the problem here is that the legacy "taskmanager.heap.size" was
>> used differently in standalone setups and active yarn / mesos setups, and
>> such different calculation logics and behaviors are exactly what we want to
>> avoid with FLIP-49. Therefore, I'm not in favor of treating
>> "taskmanager.memory.total-flink.size" differently for standalone and active
>> setups.
>>
>> I think what we really want is probably mapping "taskmanager.heap.size"
>> to different new config options in different setups. How about we
>> mark "taskmanager.heap.size" as deprecated key for neither of
>> "taskmanager.memory.total-process.size" and
>> "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly
>> configured) in startup scripts / active resource managers, and set the
>> value to "taskmanager.memory.total-flink.size" in the scripts and
>> "taskmanager.memory.total-process.size" in active resource managers (if the
>> new config options are not configured). We can provide util methods in
>> TaskExecutorResourceUtils for such conversions, to keep all the
>> configuration logics at one place.
>>
>>
>> I agree that the problem is that the legacy option
>> ‘taskmanager.heap.size’ has different semantics for standalone/container.
>> We had it initially falling back to 'taskmanager.memory.total-flink.size’
>> but I changed that to align it with container cut-off. Now I see it changes
>> standalone setup then.
>> +1 for supporting its backwards compatibility differently for
>> standalone/container setups.
>>
>>
>>   - Mini Cluster tries to imitate exact ratio of memory pools as a
>>> standalone setup. I get the idea behind that, but I am wondering if it is
>>> the right approach here.
>>>     For example: I started a relatively large JVM (large heap size of 10
>>> GB) as a test. With the current logic, the system tries to reserve an
>>> additional 6GB for managed memory which is more than there is memory left.
>>> When you see the error that no memory could be allocated, you need to
>>> understand the magic of how this is derived.
>>>     I am trying to think about this from the perspective of using "Flink
>>> as a Library", which the MiniCluster is close to.
>>>     When starting Flink out of a running process, we cannot assume that
>>> we are the only users of that process and that we can mold the process to
>>> our demands. I think a fix value for managed memory and network memory
>>> would feel more natural in such a setup than a mechanism that is tailored
>>> towards exclusive use of the process.
>>
>>
>> +1 on having fixed values for managed / shuffle memory.
>>
>>
>> also +1 for that, if user has not specified any main options to derive
>> memory. We should also log this fixing of managed / shuffle memory.
>> And just noticed, we could also sanity check framework and if explicitly
>> configured task heap against available JVM heap, and at least log
>> inconsistencies.
>>
>>   - Some off-heap memory goes into direct memory, some does not. This
>>> confused me a bit. For example "taskmanager.memory.framework.off-heap.size"
>>> is counted into MaxDirectMemory while
>>> "taskmanager.memory.task.off-heap.size" is counted as native memory. Maybe
>>> we should rename the keys to reflect that. There is no one "off heap"
>>> memory type after all. Maybe use "taskmanager.memory.task.native: XXXmb"
>>> and "taskmanager.memory.framework.direct: XXXmb" instead?
>>
>>
>> I believe "taskmanager.memory.task.off-heap.size" is also accounted in
>> the max direct memory size limit. The confusion probably comes from that
>> "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in
>> its description while "taskmanager.memory.task.off-heap.size" didn't.
>> That's actually because the framework off-heap memory is introduced later
>> in a separate commit. We should fix that.
>>
>> For framework / task off-heap memory, we do not differentiate direct /
>> native memory usage. That means the configure value for these two options
>> could be a mixture of direct / native memory. Since we do not know the
>> portion of direct memory out of the configured value, we have
>> to conservatively account it all into the max direct memory size limit.
>>
>>
>> *==>  In that case, I am a bit confused. For the total size calculation,
>> it is fine. But why do we then set MaxDirectMemory? It is a difficult
>> parameter, and the main reason to set it was (if I recall correctly) to
>> trigger GC based on direct memory allocation (to free heap structures that
>> then in turn release direct memory). If the limit is anyways too high
>> (because we also count native memory in there) such that we can exceed the
>> total process (container) memory, why do we set it then?*
>>
>>
>> I always also thought about it as providing more safety net for direct
>> allocations but GC thing looks more important.
>>
>> +1 for fixing docs for 'taskmanager.memory.task.off-heap.size’ and
>> renaming to ‘direct' as this is what really happens
>> if we want to support direct limit more exact than now.
>>
>> I also think that it is hard to separate direct / native memory unless we
>> introduce even more options.
>> If user wants to keep the direct limit tight to a certain value but also
>> use native memory outside of it,
>> she would have to increase something else, like JVM overhead to account
>> for it and there is no other better way.
>> Having more options to account for the native memory outside of direct
>> limit complicates things but can be introduced later if needed.
>>
>>   - What do you think about renaming
>>> "taskmanager.memory.total-flink.size" to "taskmanager.memory.flink.size"
>>> and "taskmanager.memory.total-process.size" to
>>> "taskmanager.memory.process.size" (or "taskmanager.memory.jvm.total"). I
>>> think these keys may be a bit less clumsy (dropping the "total-") without
>>> loss of expressiveness.
>>
>>
>> +1 on this.
>>
>>
>> +1 as well. Also an option: 'taskmanager.memory.total-process.size’ ->
>> ‘taskmanager.memory.jvm.process.size’,
>> although it can be also mentioned in docs that we mean JVM process.
>>
>>   - The network memory keys are now called
>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually
>>> understood as a redistribution (random, or maybe by hash of key). As an
>>> example, there are many discussions about "shuffle join versus broadcast
>>> join", where "shuffle" is the synonym for "re-partitioning". We use that
>>> memory however for all network operations, like forward pipes, broadcasts,
>>> receiver-side buffering on checkpoints, etc. I find the name "*.shuffle.*"
>>> confusing, I am wondering if users would find that as well. So throwing in
>>> the suggestion to call the options "taskmanager.memory.network.*".
>>
>>
>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the other
>> hand, one can also argue that this part of memory is used by
>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more
>> directly to the shuffle service components.
>>
>>
>> *==> In that case, the name "Shuffle Environment" may be a bit incorrect,
>> because it is doing not only shuffles as well. The ShuffleEnvironment is
>> also more internal, so the name is not too critical. This isn't super high
>> priority for me, but if we want to adjust it, better earlier than later.*
>>
>>
>> This is also a bit controversial topic for me. Indeed, we have always
>> used ’network’ for this concept of task data shuffling over the network and
>> this can confuse existing users.
>>
>> On the other hand for the new users and in a long term, ’network’ can
>> delude into a conclusion that all network memory is managed by this option.
>> Also other types of shuffle might not directly deal with network at all.
>>
>> By calling it shuffle, we were somewhat biased by understanding it in
>> term of map/reduce. This is rather an inter-task data exchange.
>> Maybe then 'taskmanager.memory.shuffle.communication.*’ or
>> ‘taskmanager.memory.task.shuffle/communication/io/network.*’.
>>
>>   - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys say
>>> that it also accounts for I/O direct memory, but the parameter is not
>>> counted into the MaxDirectMemory parameter.
>>
>>
>> True. Since we already have framework off-heap memory accounted for ad
>> hoc direct memory usages, accounting all of jvm-overhead also into max
>> direct memory limit seems not necessary. I would suggest to remove "I/O
>> direct memory" from the description, and explicitly mention that this
>> option does not account for direct memory and will not be accounted into
>> max direct memory limit. WDYT?
>>
>>
>> +1
>>
>>   - Can make the new ConfigOptions strongly typed with the new
>>> configuration options. For example, directly use MemorySize typed options.
>>> That makes validation automatic and saves us from breaking the options
>>> later.
>>
>> +1. Wasn't aware of the new memory type config options.
>>
>>
>> +1
>>
>> *==> Thanks. Do you need help with adjusting this?*
>>
>>
>> I would appreciate it.
>>
>> Also small side note, we extensively use MemorySize in logs now but it
>> might be not always readable as its string representation is only in bytes
>> atm
>> and does not reduce it to kb/mb/etc in case of big bytes value. We could
>> have at least some .prettyPrint function to use in logs.
>> And .fromMegabytes/etc factory methods would improve code readability
>> instead of .parse(int + “m”).
>>
>> Best,
>> Andrey
>>
>> On 20 Dec 2019, at 12:13, Stephan Ewen <se...@apache.org> wrote:
>>
>> Hi Xintong!
>>
>> Please find my answers inline:
>>
>>>   - "taskmanager.memory.size" (old main config option) is replaced by
>>>> "taskmanager.memory.total-process.size" which has a different meaning in
>>>> standalone setups. The old option did not subtract metaspace and other
>>>> overhead, while the new option does. That means that with the default
>>>> config, standalone clusters get quite a bit less memory. (independent of
>>>> managed memory going off heap).
>>>>     I am wondering if we could interpret "taskmanager.memory.size" as
>>>> the deprecated key for "taskmanager.memory.total-flink.size". That would be
>>>> in line with the old mechanism (assuming managed memory is set to off 
>>>> heap).
>>>>     The effect would be that the container size on Yarn/Mesos
>>>> increases, because from "taskmanager.memory.total-flink.size", we need to
>>>> add overhead and metaspace to reach the total process size, rather than
>>>> cutting off memory. But if we want, we could even adjust for that in the
>>>> active resource manager, getting full backwards compatibility on that part.
>>>>     Curious to hear more thoughts there.
>>>
>>>
>>> I believe you mean "taskmanager.heap.size".
>>>
>>
>> *==> Yes*
>>
>>
>>
>>> I think the problem here is that the legacy "taskmanager.heap.size" was
>>> used differently in standalone setups and active yarn / mesos setups, and
>>> such different calculation logics and behaviors are exactly what we want to
>>> avoid with FLIP-49. Therefore, I'm not in favor of treating
>>> "taskmanager.memory.total-flink.size" differently for standalone and active
>>> setups.
>>>
>>> I think what we really want is probably mapping "taskmanager.heap.size"
>>> to different new config options in different setups. How about we
>>> mark "taskmanager.heap.size" as deprecated key for neither of
>>> "taskmanager.memory.total-process.size" and
>>> "taskmanager.memory.total-flink.size". Instead, we parse it (if explicitly
>>> configured) in startup scripts / active resource managers, and set the
>>> value to "taskmanager.memory.total-flink.size" in the scripts and
>>> "taskmanager.memory.total-process.size" in active resource managers (if the
>>> new config options are not configured). We can provide util methods in
>>> TaskExecutorResourceUtils for such conversions, to keep all the
>>> configuration logics at one place.
>>>
>>
>> *==> This is pretty much what I meant as well (maybe my description was
>> not very clear), so +1 for that mechanism*
>>
>>
>>   - Mini Cluster tries to imitate exact ratio of memory pools as a
>>>> standalone setup. I get the idea behind that, but I am wondering if it is
>>>> the right approach here.
>>>>     For example: I started a relatively large JVM (large heap size of
>>>> 10 GB) as a test. With the current logic, the system tries to reserve an
>>>> additional 6GB for managed memory which is more than there is memory left.
>>>> When you see the error that no memory could be allocated, you need to
>>>> understand the magic of how this is derived.
>>>>     I am trying to think about this from the perspective of using
>>>> "Flink as a Library", which the MiniCluster is close to.
>>>>     When starting Flink out of a running process, we cannot assume that
>>>> we are the only users of that process and that we can mold the process to
>>>> our demands. I think a fix value for managed memory and network memory
>>>> would feel more natural in such a setup than a mechanism that is tailored
>>>> towards exclusive use of the process.
>>>
>>>
>>> +1 on having fixed values for managed / shuffle memory.
>>>
>>
>> *==> Cool, let's also see what Andrey and Till think here.*
>>
>>
>>
>>>   - Some off-heap memory goes into direct memory, some does not. This
>>>> confused me a bit. For example "taskmanager.memory.framework.off-heap.size"
>>>> is counted into MaxDirectMemory while
>>>> "taskmanager.memory.task.off-heap.size" is counted as native memory. Maybe
>>>> we should rename the keys to reflect that. There is no one "off heap"
>>>> memory type after all. Maybe use "taskmanager.memory.task.native: XXXmb"
>>>> and "taskmanager.memory.framework.direct: XXXmb" instead?
>>>
>>>
>>> I believe "taskmanager.memory.task.off-heap.size" is also accounted in
>>> the max direct memory size limit. The confusion probably comes from that
>>> "taskmanager.memory.framework.off-heap.size" explicitly mentioned that in
>>> its description while "taskmanager.memory.task.off-heap.size" didn't.
>>> That's actually because the framework off-heap memory is introduced later
>>> in a separate commit. We should fix that.
>>>
>>> For framework / task off-heap memory, we do not differentiate direct /
>>> native memory usage. That means the configure value for these two options
>>> could be a mixture of direct / native memory. Since we do not know the
>>> portion of direct memory out of the configured value, we have
>>> to conservatively account it all into the max direct memory size limit.
>>>
>>
>> *==>  In that case, I am a bit confused. For the total size calculation,
>> it is fine. But why do we then set MaxDirectMemory? It is a difficult
>> parameter, and the main reason to set it was (if I recall correctly) to
>> trigger GC based on direct memory allocation (to free heap structures that
>> then in turn release direct memory). If the limit is anyways too high
>> (because we also count native memory in there) such that we can exceed the
>> total process (container) memory, why do we set it then?*
>>
>>
>>   - The network memory keys are now called
>>>> "taskmanager.memory.shuffle.*". To my knowledge, shuffle is usually
>>>> understood as a redistribution (random, or maybe by hash of key). As an
>>>> example, there are many discussions about "shuffle join versus broadcast
>>>> join", where "shuffle" is the synonym for "re-partitioning". We use that
>>>> memory however for all network operations, like forward pipes, broadcasts,
>>>> receiver-side buffering on checkpoints, etc. I find the name "*.shuffle.*"
>>>> confusing, I am wondering if users would find that as well. So throwing in
>>>> the suggestion to call the options "taskmanager.memory.network.*".
>>>
>>>
>>> +0 on this one. I'm ok with "taskmanager.memory.network.*". On the other
>>> hand, one can also argue that this part of memory is used by
>>> ShuffleEnvironment, and the key "taskmanager.memory.shuffle.*" points more
>>> directly to the shuffle service components.
>>>
>>
>> *==> In that case, the name "Shuffle Environment" may be a bit incorrect,
>> because it is doing not only shuffles as well. The ShuffleEnvironment is
>> also more internal, so the name is not too critical. This isn't super high
>> priority for me, but if we want to adjust it, better earlier than later.*
>>
>>
>>    - The descriptions for the "taskmanager.memory.jvm-overhead.*" keys
>> say that it also accounts for I/O direct memory, but the parameter is not
>> counted into the MaxDirectMemory parameter.
>>
>>>
>>> True. Since we already have framework off-heap memory accounted for ad
>>> hoc direct memory usages, accounting all of jvm-overhead also into max
>>> direct memory limit seems not necessary. I would suggest to remove "I/O
>>> direct memory" from the description, and explicitly mention that this
>>> option does not account for direct memory and will not be accounted into
>>> max direct memory limit. WDYT?
>>>
>>
>> *==> Sounds good. *
>>
>>
>>
>>>   - Can make the new ConfigOptions strongly typed with the new
>>>> configuration options. For example, directly use MemorySize typed options.
>>>> That makes validation automatic and saves us from breaking the options
>>>> later.
>>>
>>> +1. Wasn't aware of the new memory type config options.
>>>
>>
>> *==> Thanks. Do you need help with adjusting this?*
>>
>>
>> Best,
>> Stephan
>>
>>
>>
>>

Reply via email to