Hi all,

Thanks for all the feedback so far.

The discussion has been going on for some time. If there are no
more new comments, we will start a vote today.

Best,
Yuxin


Yuxin Tan <tanyuxinw...@gmail.com> 于2022年12月29日周四 17:37写道:

> Hi, everyone
>
> Thanks for the reply and the discussion.
>
> We discussed this with @Guowei Ma, @Dong Lin, and @Yanfei Lei
> offline, and reached a consensus on this FLIP. Based on the offline
> discussions and suggestions from @Weihua Hu, the following changes
> have been updated in the FLIP.
>
> 1. Changes in public interfaces.
> - Updated the descriptions of the newly added config to describe the
> option more clearly.
> - The new config will be marked as experimental in the first release,
> and we will revisit this in the next release based on the user feedback.
> - In the long run, with the new config, we think the original two configs
> can be deprecated. At this stage, since the new config is still
> experimental,
> we will not immediately deprecate them.
> - Modify the config key name as
> taskmanager.memory.network.read-buffer.required-per-gate.max for
> more clarity.
> 2. Modify the floating buffer calculation method.
> - When the memory used reaches the threshold, the number of exclusive
> buffers is gradually reduced in a fine-grained manner, rather than
> directly
> reducing the number of exclusive buffers to 0.
>
> Best,
> Yuxin
>
>
> Yuxin Tan <tanyuxinw...@gmail.com> 于2022年12月29日周四 14:48写道:
>
>> Hi, Roman
>>
>> Sorry about that I missed one question just now.
>>
>> >  if the two configuration options are still in use, why does the FLIP
>> propose to deprecate them?
>> These two configs are usually used to avoid the memory issue, but
>> after introducing the improvement, generally, I think it is no longer
>> necessary to adjust these two configurations to avoid the issue. So
>> I propose to deprecate them in the future when the @Experimental
>> annotation of the newly added config is removed.
>>
>> Best,
>> Yuxin
>>
>>
>> Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 20:10写道:
>>
>>> Thanks for your reply Yuxin,
>>>
>>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
>>> > configurations, which are not calculated. I have described them in the
>>> FLIP
>>> > motivation section.
>>>
>>> The motivation section says about floating buffers:
>>> > FloatingBuffersPerGate is within the range of
>>> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
>>> DefaultFloatingBuffersPerGate] ...
>>> So my question is what value exactly in this range will it have and how
>>> and
>>> where will it be computed?
>>>
>>> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
>>> to calculate it dynamically (by linear search
>>> from taskmanager.network.memory.buffers-per-channel down to 0).
>>>
>>> Also, if the two configuration options are still in use, why does the
>>> FLIP
>>> propose to deprecate them?
>>>
>>> Besides that, wouldn't it be more clear to separate motivation from the
>>> proposed changes?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote:
>>>
>>> > Hi Yuxin
>>> >
>>> >
>>> > Thanks for the proposal, big + 1 for this FLIP.
>>> >
>>> >
>>> >
>>> > It is difficult for users to calculate the size of network memory. If
>>> the
>>> > setting is too small, the task cannot be started. If the setting is too
>>> > large, there may be a waste of resources. As far as possible, Flink
>>> > framework can automatically set a reasonable value, but I have a small
>>> > problem. network memory is not only related to the parallelism of the
>>> task,
>>> > but also to the complexity of the task DAG. The more complex a DAG is,
>>> > shuffle write and shuffle read require larger buffers. How can we
>>> determine
>>> > how many RS and IG a DAG has?
>>> >
>>> >
>>> >
>>> > Best
>>> > JasonLee
>>> >
>>> >
>>> > ---- Replied Message ----
>>> > | From | Yuxin Tan<tanyuxinw...@gmail.com> |
>>> > | Date | 12/28/2022 18:29 |
>>> > | To | <dev@flink.apache.org> |
>>> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory
>>> configurations
>>> > for TaskManager |
>>> > Hi, Roman
>>> >
>>> > Thanks for the replay.
>>> >
>>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
>>> > configurations, which are not calculated. I have described them in the
>>> FLIP
>>> > motivation section.
>>> >
>>> > 3. Each gate requires at least one buffer...
>>> > The timeout exception occurs when the ExclusiveBuffersPerChannel
>>> > can not be requested from NetworkBufferPool, which is not caused by the
>>> > change of this Flip. In addition, if  we have set the
>>> > ExclusiveBuffersPerChannel
>>> > to 0 when using floating buffers, which can also decrease the
>>> probability
>>> > of
>>> > this exception.
>>> >
>>> > 4. It would be great to have experimental results for jobs with
>>> different
>>> > exchange types.
>>> > Thanks for the suggestion. I have a test about different exchange
>>> types,
>>> > forward
>>> > and rescale, and the results show no differences from the all-to-all
>>> type,
>>> > which
>>> > is also understandable, because the network memory usage is calculated
>>> > with numChannels, independent of the edge type.
>>> >
>>> > Best,
>>> > Yuxin
>>> >
>>> >
>>> > Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道:
>>> >
>>> > Hi everyone,
>>> >
>>> > Thanks for the proposal and the discussion.
>>> >
>>> > I couldn't find much details on how exactly the values of
>>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
>>> > I guess that
>>> > - the threshold evaluation is done on JM
>>> > - floating buffers calculation is done on TM based on the current
>>> memory
>>> > available; so it is not taking into account any future tasks submitted
>>> for
>>> > that (or other) job
>>> > Is that correct?
>>> >
>>> > If so, I see the following potential issues:
>>> >
>>> > 1. Each (sub)task might have different values because the actual
>>> > available memory might be different. E.g. some tasks might use
>>> exclusive
>>> > buffers and others only floating. That could lead to significant skew
>>> > in processing speed, and in turn to issues with checkpoints and
>>> watermarks.
>>> >
>>> > 2. Re-deployment of a task (e.g. on job failure) might lead to a
>>> completely
>>> > different memory configuration. That, coupled with different values per
>>> > subtask and operator, makes the performance analysis more difficult.
>>> >
>>> > (Regardless of whether it's done on TM or JM):
>>> > 3. Each gate requires at least one buffer [1]. So, in case when no
>>> memory
>>> > is available, TM will throw an Allocation timeout exception instead of
>>> > Insufficient buffers exception immediately. A delay here (allocation
>>> > timeout) seems like a regression.
>>> > Besides that, the regression depends on how much memory is actually
>>> > available and how much it is contended, doesn't it?
>>> > Should there still be a lower threshold of available memory, below
>>> which
>>> > the job (task) isn't accepted?
>>> > 4. The same threshold for all types of shuffles will likely result in
>>> using
>>> > exclusive buffers
>>> > for point-wise connections and floating buffers for all-to-all ones.
>>> I'm
>>> > not sure if that's always optimal. It would be great to have
>>> experimental
>>> > results for jobs with different exchange types, WDYT?
>>> >
>>> > [1]
>>> > https://issues.apache.org/jira/browse/FLINK-24035
>>> >
>>> > Regards,
>>> > Roman
>>> >
>>> >
>>> > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <tanyuxinw...@gmail.com>
>>> wrote:
>>> >
>>> > Hi, Weihua
>>> >
>>> > Thanks for your suggestions.
>>> >
>>> > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
>>> > total buffer is not enough?
>>> >
>>> > I think it's a good idea. Will try and check the results in PoC. Before
>>> > all
>>> > read buffers use floating buffers, I will try to use
>>> > (ExclusiveBuffersPerChannel - i)
>>> > buffers per channel first. For example, if the user has configured
>>> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
>>> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
>>> > all channels is 1 and all read buffers are insufficient, all read
>>> buffers
>>> > will use floating buffers.
>>> > If the test results prove better, the FLIP will use this method.
>>> >
>>> > 2. Do we really need to change the default value of
>>> > 'taskmanager.memory.network.max'?
>>> >
>>> > Changing taskmanager.memory.network.max will indeed affect some
>>> > users, but the user only is affected when the 3 conditions are
>>> fulfilled.
>>> > 1) Flink total TM memory is larger than 10g (because the network memory
>>> > ratio is 0.1).
>>> > 2) taskmanager.memory.network.max was not initially configured.
>>> > 3) Other memory, such as managed memory or heap memory, is
>>> insufficient.
>>> > I think the number of jobs fulfilling the conditions is small because
>>> > when
>>> > TM
>>> > uses such a large amount of memory, the network memory requirement may
>>> > also be large. And when encountering the issue, the rollback method is
>>> > very
>>> > simple,
>>> > configuring taskmanager.memory.network.max as 1g or other values.
>>> > In addition, the reason for modifying the default value is to simplify
>>> > the
>>> > network
>>> > configurations in most scenarios. This change does affect a few usage
>>> > scenarios,
>>> > but we should admit that setting the default to any value may not meet
>>> > the requirements of all scenarios.
>>> >
>>> > Best,
>>> > Yuxin
>>> >
>>> >
>>> > Weihua Hu <huweihua....@gmail.com> 于2022年12月26日周一 20:35写道:
>>> >
>>> > Hi Yuxin,
>>> > Thanks for the proposal.
>>> >
>>> > "Insufficient number of network buffers" exceptions also bother us.
>>> > It's
>>> > too hard for users to figure out
>>> > how much network buffer they really need. It relates to partitioner
>>> > type,
>>> > parallelism, slots per taskmanager.
>>> >
>>> > Since streaming jobs are our primary scenario, I have some questions
>>> > about
>>> > streaming jobs.
>>> >
>>> > 1. In this FLIP, all read buffers will use floating buffers when the
>>> > total
>>> > buffer is more than
>>> > 'taskmanager.memory.network.read-required-buffer.max'. Competition in
>>> > buffer allocation led to preference regression.
>>> > How about reducing ExclusiveBuffersPerChannel to 1 first when the total
>>> > buffer is not enough?
>>> > Will this reduce performance regression in streaming?
>>> >
>>> > 2. Changing taskmanager.memory.network.max will affect user migration
>>> > from
>>> > the lower version.
>>> > IMO, network buffer size should not increase with total memory,
>>> > especially
>>> > for streaming jobs with application mode.
>>> > For example, some ETL jobs with rescale partitioner only require a few
>>> > network buffers.
>>> > And we already have
>>> > 'taskmanager.memory.network.read-required-buffer.max'
>>> > to control maximum read network buffer usage.
>>> > Do we really need to change the default value of
>>> > 'taskmanager.memory.network.max'?
>>> >
>>> > Best,
>>> > Weihua
>>> >
>>> >
>>> > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <tanyuxinw...@gmail.com>
>>> > wrote:
>>> >
>>> > Hi, all
>>> > Thanks for the reply and feedback for everyone!
>>> >
>>> >
>>> > After combining everyone's comments, the main concerns, and
>>> > corresponding
>>> > adjustments are as follows.
>>> >
>>> >
>>> > @Guowei Ma, Thanks for your feedback.
>>> > should we introduce a _new_ non-orthogonal
>>> > option(`taskmanager.memory.network.required-buffer-per-gate.max`).
>>> > That
>>> > is
>>> > to say, the option will affect both streaming and batch shuffle
>>> > behavior
>>> > at
>>> > the
>>> > same time.
>>> >
>>> > 1. Because the default option can meet most requirements no matter in
>>> > Streaming
>>> > or Batch scenarios. We do not want users to adjust this default
>>> > config
>>> > option by
>>> > design. This configuration option is added only to preserve the
>>> > possibility
>>> > of
>>> > modification options for users.
>>> > 2. In a few cases, if you really want to adjust this option, users
>>> > may
>>> > not
>>> > expect to
>>> > adjust the option according to Streaming or Batch, for example,
>>> > according
>>> > to the
>>> > parallelism of the job.
>>> > 3. Regarding the performance of streaming shuffle, the same problem
>>> > of
>>> > insufficient memory also exists for Streaming jobs. We introduced
>>> > this
>>> > configuration
>>> > to enable users to decouple memory and parallelism, but it will
>>> > affect
>>> > some
>>> > performance. By default, the feature is disabled and does not affect
>>> > performance.
>>> > However, the added configuration enables users to choose to decouple
>>> > memory
>>> > usage and parallelism for Streaming jobs.
>>> >
>>> > It's better not to expose more implementation-related concepts to
>>> > users.
>>> >
>>> > Thanks for you suggestion. I will modify the option name to avoid
>>> > exposing
>>> > implementation-related concepts. I have changed it to
>>> > `taskmanager.memory.network.read-required-buffer.max` in the FLIP.
>>> >
>>> >
>>> >
>>> > @Dong Lin, Thanks for your reply.
>>> > it might be helpful to add a dedicated public interface section to
>>> > describe
>>> > the config key and config semantics.
>>> >
>>> > Thanks for your suggestion. I have added public interface section to
>>> > describe
>>> > the config key and config semantics clearly.
>>> >
>>> > This FLIP seems to add more configs without removing any config
>>> > from
>>> > Flink.
>>> >
>>> > This Flip is to reduce the number of options to be adjusted when
>>> > using
>>> > Flink.
>>> > After the Flip, the default option can meet the requirements in most
>>> > sceneries
>>> > rather than modifying any config
>>> > options(`taskmanager.network.memory.buffers-per-channel`
>>> > and `taskmanager.network.memory.floating-buffers-per-gate`), which is
>>> > helpful
>>> > to improve the out-of-box usability. In the long run, these two
>>> > parameters
>>> > `taskmanager.network.memory.buffers-per-channel` and
>>> > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be
>>> > deprecated
>>> > to reduce user parameters, but from the perspective of compatibility,
>>> > we
>>> > need to
>>> > pay attention to users' feedback before deciding to deprecate the
>>> > options.
>>> >
>>> >
>>> >
>>> > @Yanfei Lei,Thanks for your feedback.
>>> > 1. Through the option is cluster level, the default value is
>>> > different
>>> > according to the
>>> > job type. In other words, by default, for Batch jobs, the config
>>> > value
>>> > is
>>> > enabled, 1000.
>>> > And for Streaming jobs, the config value is not enabled by default.
>>> >
>>> > 2. I think this is a good point. The total floating buffers will not
>>> > change
>>> > with
>>> >
>>> >
>>> >
>>> >
>>> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel)
>>> > because this is the maximum memory threshold. But if the user
>>> > explicitly
>>> > specified
>>> > the ExclusiveBuffersPerChannel, the calculated result of
>>> > ExclusiveBuffersPerChannel * numChannels will change with it.
>>> >
>>> >
>>> > Thanks again for all feedback!
>>> >
>>> >
>>> > Best,
>>> > Yuxin
>>> >
>>> >
>>> > Zhu Zhu <reed...@gmail.com> 于2022年12月26日周一 17:18写道:
>>> >
>>> > Hi Yuxin,
>>> >
>>> > Thanks for creating this FLIP.
>>> >
>>> > It's good if Flink does not require users to set a very large
>>> > network
>>> > memory, or tune the advanced(hard-to-understand)
>>> > per-channel/per-gate
>>> > buffer configs, to avoid "Insufficient number of network buffers"
>>> > exceptions
>>> > which can easily happen for large scale jobs.
>>> >
>>> > Regarding the new config
>>> > "taskmanager.memory.network.read-required-buffer.max",
>>> > I think it's still an advanced config which users may feel hard to
>>> > tune.
>>> > However, given that in most cases users will not need to set it, I
>>> > think it's acceptable.
>>> >
>>> > So +1 for this FLIP.
>>> >
>>> > In the future, I think Flink should adaptively select to use
>>> > exclusive
>>> > buffers
>>> > or not according to whether there are sufficient network buffers at
>>> > runtime.
>>> > Users then no longer need to understand the above configuration.
>>> > This
>>> > may
>>> > require supporting transitions between exclusive buffers and
>>> > floating
>>> > buffers.
>>> > A problem of all buffer floating is that too few network buffers
>>> > can
>>> > result
>>> > in task slowness which is hard to identify by users. So it's also
>>> > needed
>>> > to
>>> > do improvements on metrics and web UI to expose such issues.
>>> >
>>> > Thanks,
>>> > Zhu
>>> >
>>> > Yanfei Lei <fredia...@gmail.com> 于2022年12月26日周一 11:13写道:
>>> >
>>> > Hi Yuxin,
>>> >
>>> > Thanks for the proposal!
>>> >
>>> > After reading the FLIP, I have some questions about the default
>>> > value.
>>> > This FLIP seems to introduce a *new* config
>>> > option(taskmanager.memory.network.required-buffer-per-gate.max)
>>> > to
>>> > control
>>> > the network memory usage.
>>> > 1. Is this configuration at the job level or cluster level? As
>>> > the
>>> > FLIP
>>> > described, the default values of the Batch job and Stream job are
>>> > different, If an explicit value is set for cluster level, will it
>>> > affect
>>> > all Batch jobs and Stream jobs on the cluster?
>>> >
>>> > 2. The default value of Batch Job depends on the value of
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel),
>>> > if the value of ExclusiveBuffersPerChannel changed, does
>>> > "taskmanager.memory.network.required-buffer-per-gate.max" need to
>>> > change
>>> > with it?
>>> >
>>> >
>>> > Best,
>>> > Yanfei
>>> >
>>> > Dong Lin <lindon...@gmail.com> 于2022年12月25日周日 08:58写道:
>>> >
>>> > Hi Yuxin,
>>> >
>>> > Thanks for proposing the FLIP!
>>> >
>>> > The motivation section makes sense. But it seems that the
>>> > proposed
>>> > change
>>> > section mixes the proposed config with the evaluation results.
>>> > It
>>> > is
>>> > a
>>> > bit
>>> > hard to understand what configs are proposed and how to
>>> > describe
>>> > these
>>> > configs to users. Given that the configuration setting is part
>>> > of
>>> > public
>>> > interfaces, it might be helpful to add a dedicated public
>>> > interface
>>> > section
>>> > to describe the config key and config semantics, as suggested
>>> > in
>>> > the
>>> > FLIP
>>> > template here
>>> > <
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>> >
>>> > .
>>> >
>>> > This FLIP seems to add more configs without removing any config
>>> > from
>>> > Flink.
>>> > Intuitively this can make the Flink configuration harder rather
>>> > than
>>> > simpler. Maybe we can get a better idea after we add a public
>>> > interface
>>> > section to clarify those configs.
>>> >
>>> > Thanks,
>>> > Dong
>>> >
>>> >
>>> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan <
>>> > tanyuxinw...@gmail.com>
>>> > wrote:
>>> >
>>> > Hi, devs,
>>> >
>>> > I'd like to start a discussion about FLIP-266: Simplify
>>> > network
>>> > memory
>>> > configurations for TaskManager[1].
>>> >
>>> > When using Flink, users may encounter the following issues
>>> > that
>>> > affect
>>> > usability.
>>> > 1. The job may fail with an "Insufficient number of network
>>> > buffers"
>>> > exception.
>>> > 2. Flink network memory size adjustment is complex.
>>> > When encountering these issues, users can solve some problems
>>> > by
>>> > adding
>>> > or
>>> > adjusting parameters. However, multiple memory config options
>>> > should
>>> > be
>>> > changed. The config option adjustment requires understanding
>>> > the
>>> > detailed
>>> > internal implementation, which is impractical for most users.
>>> >
>>> > To simplify network memory configurations for TaskManager and
>>> > improve
>>> > Flink
>>> > usability, this FLIP proposed some optimization solutions for
>>> > the
>>> > issues.
>>> >
>>> > Looking forward to your feedback.
>>> >
>>> > [1]
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager
>>> >
>>> > Best regards,
>>> > Yuxin
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>

Reply via email to