Hi all, Thanks for all the feedback so far.
The discussion has been going on for some time. If there are no more new comments, we will start a vote today. Best, Yuxin Yuxin Tan <tanyuxinw...@gmail.com> 于2022年12月29日周四 17:37写道: > Hi, everyone > > Thanks for the reply and the discussion. > > We discussed this with @Guowei Ma, @Dong Lin, and @Yanfei Lei > offline, and reached a consensus on this FLIP. Based on the offline > discussions and suggestions from @Weihua Hu, the following changes > have been updated in the FLIP. > > 1. Changes in public interfaces. > - Updated the descriptions of the newly added config to describe the > option more clearly. > - The new config will be marked as experimental in the first release, > and we will revisit this in the next release based on the user feedback. > - In the long run, with the new config, we think the original two configs > can be deprecated. At this stage, since the new config is still > experimental, > we will not immediately deprecate them. > - Modify the config key name as > taskmanager.memory.network.read-buffer.required-per-gate.max for > more clarity. > 2. Modify the floating buffer calculation method. > - When the memory used reaches the threshold, the number of exclusive > buffers is gradually reduced in a fine-grained manner, rather than > directly > reducing the number of exclusive buffers to 0. > > Best, > Yuxin > > > Yuxin Tan <tanyuxinw...@gmail.com> 于2022年12月29日周四 14:48写道: > >> Hi, Roman >> >> Sorry about that I missed one question just now. >> >> > if the two configuration options are still in use, why does the FLIP >> propose to deprecate them? >> These two configs are usually used to avoid the memory issue, but >> after introducing the improvement, generally, I think it is no longer >> necessary to adjust these two configurations to avoid the issue. So >> I propose to deprecate them in the future when the @Experimental >> annotation of the newly added config is removed. >> >> Best, >> Yuxin >> >> >> Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 20:10写道: >> >>> Thanks for your reply Yuxin, >>> >>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from >>> > configurations, which are not calculated. I have described them in the >>> FLIP >>> > motivation section. >>> >>> The motivation section says about floating buffers: >>> > FloatingBuffersPerGate is within the range of >>> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels + >>> DefaultFloatingBuffersPerGate] ... >>> So my question is what value exactly in this range will it have and how >>> and >>> where will it be computed? >>> >>> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread >>> to calculate it dynamically (by linear search >>> from taskmanager.network.memory.buffers-per-channel down to 0). >>> >>> Also, if the two configuration options are still in use, why does the >>> FLIP >>> propose to deprecate them? >>> >>> Besides that, wouldn't it be more clear to separate motivation from the >>> proposed changes? >>> >>> Regards, >>> Roman >>> >>> >>> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote: >>> >>> > Hi Yuxin >>> > >>> > >>> > Thanks for the proposal, big + 1 for this FLIP. >>> > >>> > >>> > >>> > It is difficult for users to calculate the size of network memory. If >>> the >>> > setting is too small, the task cannot be started. If the setting is too >>> > large, there may be a waste of resources. As far as possible, Flink >>> > framework can automatically set a reasonable value, but I have a small >>> > problem. network memory is not only related to the parallelism of the >>> task, >>> > but also to the complexity of the task DAG. The more complex a DAG is, >>> > shuffle write and shuffle read require larger buffers. How can we >>> determine >>> > how many RS and IG a DAG has? >>> > >>> > >>> > >>> > Best >>> > JasonLee >>> > >>> > >>> > ---- Replied Message ---- >>> > | From | Yuxin Tan<tanyuxinw...@gmail.com> | >>> > | Date | 12/28/2022 18:29 | >>> > | To | <dev@flink.apache.org> | >>> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory >>> configurations >>> > for TaskManager | >>> > Hi, Roman >>> > >>> > Thanks for the replay. >>> > >>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from >>> > configurations, which are not calculated. I have described them in the >>> FLIP >>> > motivation section. >>> > >>> > 3. Each gate requires at least one buffer... >>> > The timeout exception occurs when the ExclusiveBuffersPerChannel >>> > can not be requested from NetworkBufferPool, which is not caused by the >>> > change of this Flip. In addition, if we have set the >>> > ExclusiveBuffersPerChannel >>> > to 0 when using floating buffers, which can also decrease the >>> probability >>> > of >>> > this exception. >>> > >>> > 4. It would be great to have experimental results for jobs with >>> different >>> > exchange types. >>> > Thanks for the suggestion. I have a test about different exchange >>> types, >>> > forward >>> > and rescale, and the results show no differences from the all-to-all >>> type, >>> > which >>> > is also understandable, because the network memory usage is calculated >>> > with numChannels, independent of the edge type. >>> > >>> > Best, >>> > Yuxin >>> > >>> > >>> > Roman Khachatryan <ro...@apache.org> 于2022年12月28日周三 05:27写道: >>> > >>> > Hi everyone, >>> > >>> > Thanks for the proposal and the discussion. >>> > >>> > I couldn't find much details on how exactly the values of >>> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated. >>> > I guess that >>> > - the threshold evaluation is done on JM >>> > - floating buffers calculation is done on TM based on the current >>> memory >>> > available; so it is not taking into account any future tasks submitted >>> for >>> > that (or other) job >>> > Is that correct? >>> > >>> > If so, I see the following potential issues: >>> > >>> > 1. Each (sub)task might have different values because the actual >>> > available memory might be different. E.g. some tasks might use >>> exclusive >>> > buffers and others only floating. That could lead to significant skew >>> > in processing speed, and in turn to issues with checkpoints and >>> watermarks. >>> > >>> > 2. Re-deployment of a task (e.g. on job failure) might lead to a >>> completely >>> > different memory configuration. That, coupled with different values per >>> > subtask and operator, makes the performance analysis more difficult. >>> > >>> > (Regardless of whether it's done on TM or JM): >>> > 3. Each gate requires at least one buffer [1]. So, in case when no >>> memory >>> > is available, TM will throw an Allocation timeout exception instead of >>> > Insufficient buffers exception immediately. A delay here (allocation >>> > timeout) seems like a regression. >>> > Besides that, the regression depends on how much memory is actually >>> > available and how much it is contended, doesn't it? >>> > Should there still be a lower threshold of available memory, below >>> which >>> > the job (task) isn't accepted? >>> > 4. The same threshold for all types of shuffles will likely result in >>> using >>> > exclusive buffers >>> > for point-wise connections and floating buffers for all-to-all ones. >>> I'm >>> > not sure if that's always optimal. It would be great to have >>> experimental >>> > results for jobs with different exchange types, WDYT? >>> > >>> > [1] >>> > https://issues.apache.org/jira/browse/FLINK-24035 >>> > >>> > Regards, >>> > Roman >>> > >>> > >>> > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan <tanyuxinw...@gmail.com> >>> wrote: >>> > >>> > Hi, Weihua >>> > >>> > Thanks for your suggestions. >>> > >>> > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the >>> > total buffer is not enough? >>> > >>> > I think it's a good idea. Will try and check the results in PoC. Before >>> > all >>> > read buffers use floating buffers, I will try to use >>> > (ExclusiveBuffersPerChannel - i) >>> > buffers per channel first. For example, if the user has configured >>> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers >>> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of >>> > all channels is 1 and all read buffers are insufficient, all read >>> buffers >>> > will use floating buffers. >>> > If the test results prove better, the FLIP will use this method. >>> > >>> > 2. Do we really need to change the default value of >>> > 'taskmanager.memory.network.max'? >>> > >>> > Changing taskmanager.memory.network.max will indeed affect some >>> > users, but the user only is affected when the 3 conditions are >>> fulfilled. >>> > 1) Flink total TM memory is larger than 10g (because the network memory >>> > ratio is 0.1). >>> > 2) taskmanager.memory.network.max was not initially configured. >>> > 3) Other memory, such as managed memory or heap memory, is >>> insufficient. >>> > I think the number of jobs fulfilling the conditions is small because >>> > when >>> > TM >>> > uses such a large amount of memory, the network memory requirement may >>> > also be large. And when encountering the issue, the rollback method is >>> > very >>> > simple, >>> > configuring taskmanager.memory.network.max as 1g or other values. >>> > In addition, the reason for modifying the default value is to simplify >>> > the >>> > network >>> > configurations in most scenarios. This change does affect a few usage >>> > scenarios, >>> > but we should admit that setting the default to any value may not meet >>> > the requirements of all scenarios. >>> > >>> > Best, >>> > Yuxin >>> > >>> > >>> > Weihua Hu <huweihua....@gmail.com> 于2022年12月26日周一 20:35写道: >>> > >>> > Hi Yuxin, >>> > Thanks for the proposal. >>> > >>> > "Insufficient number of network buffers" exceptions also bother us. >>> > It's >>> > too hard for users to figure out >>> > how much network buffer they really need. It relates to partitioner >>> > type, >>> > parallelism, slots per taskmanager. >>> > >>> > Since streaming jobs are our primary scenario, I have some questions >>> > about >>> > streaming jobs. >>> > >>> > 1. In this FLIP, all read buffers will use floating buffers when the >>> > total >>> > buffer is more than >>> > 'taskmanager.memory.network.read-required-buffer.max'. Competition in >>> > buffer allocation led to preference regression. >>> > How about reducing ExclusiveBuffersPerChannel to 1 first when the total >>> > buffer is not enough? >>> > Will this reduce performance regression in streaming? >>> > >>> > 2. Changing taskmanager.memory.network.max will affect user migration >>> > from >>> > the lower version. >>> > IMO, network buffer size should not increase with total memory, >>> > especially >>> > for streaming jobs with application mode. >>> > For example, some ETL jobs with rescale partitioner only require a few >>> > network buffers. >>> > And we already have >>> > 'taskmanager.memory.network.read-required-buffer.max' >>> > to control maximum read network buffer usage. >>> > Do we really need to change the default value of >>> > 'taskmanager.memory.network.max'? >>> > >>> > Best, >>> > Weihua >>> > >>> > >>> > On Mon, Dec 26, 2022 at 6:26 PM Yuxin Tan <tanyuxinw...@gmail.com> >>> > wrote: >>> > >>> > Hi, all >>> > Thanks for the reply and feedback for everyone! >>> > >>> > >>> > After combining everyone's comments, the main concerns, and >>> > corresponding >>> > adjustments are as follows. >>> > >>> > >>> > @Guowei Ma, Thanks for your feedback. >>> > should we introduce a _new_ non-orthogonal >>> > option(`taskmanager.memory.network.required-buffer-per-gate.max`). >>> > That >>> > is >>> > to say, the option will affect both streaming and batch shuffle >>> > behavior >>> > at >>> > the >>> > same time. >>> > >>> > 1. Because the default option can meet most requirements no matter in >>> > Streaming >>> > or Batch scenarios. We do not want users to adjust this default >>> > config >>> > option by >>> > design. This configuration option is added only to preserve the >>> > possibility >>> > of >>> > modification options for users. >>> > 2. In a few cases, if you really want to adjust this option, users >>> > may >>> > not >>> > expect to >>> > adjust the option according to Streaming or Batch, for example, >>> > according >>> > to the >>> > parallelism of the job. >>> > 3. Regarding the performance of streaming shuffle, the same problem >>> > of >>> > insufficient memory also exists for Streaming jobs. We introduced >>> > this >>> > configuration >>> > to enable users to decouple memory and parallelism, but it will >>> > affect >>> > some >>> > performance. By default, the feature is disabled and does not affect >>> > performance. >>> > However, the added configuration enables users to choose to decouple >>> > memory >>> > usage and parallelism for Streaming jobs. >>> > >>> > It's better not to expose more implementation-related concepts to >>> > users. >>> > >>> > Thanks for you suggestion. I will modify the option name to avoid >>> > exposing >>> > implementation-related concepts. I have changed it to >>> > `taskmanager.memory.network.read-required-buffer.max` in the FLIP. >>> > >>> > >>> > >>> > @Dong Lin, Thanks for your reply. >>> > it might be helpful to add a dedicated public interface section to >>> > describe >>> > the config key and config semantics. >>> > >>> > Thanks for your suggestion. I have added public interface section to >>> > describe >>> > the config key and config semantics clearly. >>> > >>> > This FLIP seems to add more configs without removing any config >>> > from >>> > Flink. >>> > >>> > This Flip is to reduce the number of options to be adjusted when >>> > using >>> > Flink. >>> > After the Flip, the default option can meet the requirements in most >>> > sceneries >>> > rather than modifying any config >>> > options(`taskmanager.network.memory.buffers-per-channel` >>> > and `taskmanager.network.memory.floating-buffers-per-gate`), which is >>> > helpful >>> > to improve the out-of-box usability. In the long run, these two >>> > parameters >>> > `taskmanager.network.memory.buffers-per-channel` and >>> > `taskmanager.network.memory.floating-buffers-per-gate` may indeed be >>> > deprecated >>> > to reduce user parameters, but from the perspective of compatibility, >>> > we >>> > need to >>> > pay attention to users' feedback before deciding to deprecate the >>> > options. >>> > >>> > >>> > >>> > @Yanfei Lei,Thanks for your feedback. >>> > 1. Through the option is cluster level, the default value is >>> > different >>> > according to the >>> > job type. In other words, by default, for Batch jobs, the config >>> > value >>> > is >>> > enabled, 1000. >>> > And for Streaming jobs, the config value is not enabled by default. >>> > >>> > 2. I think this is a good point. The total floating buffers will not >>> > change >>> > with >>> > >>> > >>> > >>> > >>> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel) >>> > because this is the maximum memory threshold. But if the user >>> > explicitly >>> > specified >>> > the ExclusiveBuffersPerChannel, the calculated result of >>> > ExclusiveBuffersPerChannel * numChannels will change with it. >>> > >>> > >>> > Thanks again for all feedback! >>> > >>> > >>> > Best, >>> > Yuxin >>> > >>> > >>> > Zhu Zhu <reed...@gmail.com> 于2022年12月26日周一 17:18写道: >>> > >>> > Hi Yuxin, >>> > >>> > Thanks for creating this FLIP. >>> > >>> > It's good if Flink does not require users to set a very large >>> > network >>> > memory, or tune the advanced(hard-to-understand) >>> > per-channel/per-gate >>> > buffer configs, to avoid "Insufficient number of network buffers" >>> > exceptions >>> > which can easily happen for large scale jobs. >>> > >>> > Regarding the new config >>> > "taskmanager.memory.network.read-required-buffer.max", >>> > I think it's still an advanced config which users may feel hard to >>> > tune. >>> > However, given that in most cases users will not need to set it, I >>> > think it's acceptable. >>> > >>> > So +1 for this FLIP. >>> > >>> > In the future, I think Flink should adaptively select to use >>> > exclusive >>> > buffers >>> > or not according to whether there are sufficient network buffers at >>> > runtime. >>> > Users then no longer need to understand the above configuration. >>> > This >>> > may >>> > require supporting transitions between exclusive buffers and >>> > floating >>> > buffers. >>> > A problem of all buffer floating is that too few network buffers >>> > can >>> > result >>> > in task slowness which is hard to identify by users. So it's also >>> > needed >>> > to >>> > do improvements on metrics and web UI to expose such issues. >>> > >>> > Thanks, >>> > Zhu >>> > >>> > Yanfei Lei <fredia...@gmail.com> 于2022年12月26日周一 11:13写道: >>> > >>> > Hi Yuxin, >>> > >>> > Thanks for the proposal! >>> > >>> > After reading the FLIP, I have some questions about the default >>> > value. >>> > This FLIP seems to introduce a *new* config >>> > option(taskmanager.memory.network.required-buffer-per-gate.max) >>> > to >>> > control >>> > the network memory usage. >>> > 1. Is this configuration at the job level or cluster level? As >>> > the >>> > FLIP >>> > described, the default values of the Batch job and Stream job are >>> > different, If an explicit value is set for cluster level, will it >>> > affect >>> > all Batch jobs and Stream jobs on the cluster? >>> > >>> > 2. The default value of Batch Job depends on the value of >>> > >>> > >>> > >>> > >>> > >>> > >>> ExclusiveBuffersPerChannel(taskmanager.network.memory.buffers-per-channel), >>> > if the value of ExclusiveBuffersPerChannel changed, does >>> > "taskmanager.memory.network.required-buffer-per-gate.max" need to >>> > change >>> > with it? >>> > >>> > >>> > Best, >>> > Yanfei >>> > >>> > Dong Lin <lindon...@gmail.com> 于2022年12月25日周日 08:58写道: >>> > >>> > Hi Yuxin, >>> > >>> > Thanks for proposing the FLIP! >>> > >>> > The motivation section makes sense. But it seems that the >>> > proposed >>> > change >>> > section mixes the proposed config with the evaluation results. >>> > It >>> > is >>> > a >>> > bit >>> > hard to understand what configs are proposed and how to >>> > describe >>> > these >>> > configs to users. Given that the configuration setting is part >>> > of >>> > public >>> > interfaces, it might be helpful to add a dedicated public >>> > interface >>> > section >>> > to describe the config key and config semantics, as suggested >>> > in >>> > the >>> > FLIP >>> > template here >>> > < >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals >>> > >>> > . >>> > >>> > This FLIP seems to add more configs without removing any config >>> > from >>> > Flink. >>> > Intuitively this can make the Flink configuration harder rather >>> > than >>> > simpler. Maybe we can get a better idea after we add a public >>> > interface >>> > section to clarify those configs. >>> > >>> > Thanks, >>> > Dong >>> > >>> > >>> > On Mon, Dec 19, 2022 at 3:36 PM Yuxin Tan < >>> > tanyuxinw...@gmail.com> >>> > wrote: >>> > >>> > Hi, devs, >>> > >>> > I'd like to start a discussion about FLIP-266: Simplify >>> > network >>> > memory >>> > configurations for TaskManager[1]. >>> > >>> > When using Flink, users may encounter the following issues >>> > that >>> > affect >>> > usability. >>> > 1. The job may fail with an "Insufficient number of network >>> > buffers" >>> > exception. >>> > 2. Flink network memory size adjustment is complex. >>> > When encountering these issues, users can solve some problems >>> > by >>> > adding >>> > or >>> > adjusting parameters. However, multiple memory config options >>> > should >>> > be >>> > changed. The config option adjustment requires understanding >>> > the >>> > detailed >>> > internal implementation, which is impractical for most users. >>> > >>> > To simplify network memory configurations for TaskManager and >>> > improve >>> > Flink >>> > usability, this FLIP proposed some optimization solutions for >>> > the >>> > issues. >>> > >>> > Looking forward to your feedback. >>> > >>> > [1] >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-266%3A+Simplify+network+memory+configurations+for+TaskManager >>> > >>> > Best regards, >>> > Yuxin >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> >>