Hi Anton Kalashnikov, I think you agree with we should limit the maximum number of overdraft segments that each LocalBufferPool can apply for, right?
I prefer to hard code the maxOverdraftBuffers due to don't add the new configuration. And I hope to hear more from the community. Best wishes fanrui On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote: > Hi Anton Kalashnikov, > > Thanks for your very clear reply, I think you are totally right. > > The 'maxBuffersNumber - buffersInUseNumber' can be used as the > overdraft buffer, it won't need the new buffer configuration.Flink users > can turn up the maxBuffersNumber to control the overdraft buffer size. > > Also, I‘d like to add some information. For safety, we should limit the > maximum number of overdraft segments that each LocalBufferPool > can apply for. > > Why do we limit it? > Some operators don't check the `recordWriter.isAvailable` during > processing records, such as LegacySource. I have mentioned it in > FLINK-26759 [1]. I'm not sure if there are other cases. > > If don't add the limitation, the LegacySource will use up all remaining > memory in the NetworkBufferPool when the backpressure is severe. > > How to limit it? > I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions` > in the constructor of LocalBufferPool. The maxOverdraftBuffers is just > for safety, and it should be enough for most flink jobs. Or we can set > `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle > some jobs of low parallelism. > > Also if user don't enable the Unaligned Checkpoint, we can set > maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because > the overdraft isn't useful for the Aligned Checkpoint. > > Please correct me if I'm wrong. Thanks a lot. > > [1] https://issues.apache.org/jira/browse/FLINK-26759 > > Best wishes > fanrui > > On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <kaa....@yandex.com> > wrote: > >> Hi fanrui, >> >> Thanks for creating the FLIP. >> >> In general, I think the overdraft is good idea and it should help in >> described above cases. Here are my thoughts about configuration: >> >> Please, correct me if I am wrong but as I understand right now we have >> following calculation. >> >> maxBuffersNumber(per TaskManager) = Network memory(calculated via >> taskmanager.memory.network.fraction, taskmanager.memory.network.min, >> taskmanager.memory.network.max and total memory size) / >> taskmanager.memory.segment-size. >> >> requiredBuffersNumber(per TaskManager) = (exclusive buffers * >> parallelism + floating buffers) * subtasks number in TaskManager >> >> buffersInUseNumber = real number of buffers which used at current >> moment(always <= requiredBuffersNumber) >> >> Ideally requiredBuffersNumber should be equal to maxBuffersNumber which >> allows Flink work predictibly. But if requiredBuffersNumber > >> maxBuffersNumber sometimes it is also fine(but not good) since not all >> required buffers really mandatory(e.g. it is ok if Flink can not >> allocate floating buffers) >> >> But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink >> just never use these leftovers buffers(maxBuffersNumber - >> requiredBuffersNumber). Which I propose to use. ( we can actualy use >> even difference 'requiredBuffersNumber - buffersInUseNumber' since if >> one TaskManager contains several operators including 'window' which can >> temporally borrow buffers from the global pool). >> >> My proposal, more specificaly(it relates only to requesting buffers >> during processing single record while switching to unavalability between >> records should be the same as we have it now): >> >> * If one more buffer requested but maxBuffersPerChannel reached, then >> just ignore this limitation and allocate this buffers from any >> place(from LocalBufferPool if it has something yet otherwise from >> NetworkBufferPool) >> >> * If LocalBufferPool exceeds limit, then temporally allocate it from >> NetworkBufferPool while it has something to allocate >> >> >> Maybe I missed something and this solution won't work, but I like it >> since on the one hand, it work from the scratch without any >> configuration, on the other hand, it can be configuration by changing >> proportion of maxBuffersNumber and requiredBuffersNumber. >> >> The last thing that I want to say, I don't really want to implement new >> configuration since even now it is not clear how to correctly configure >> network buffers with existing configuration and I don't want to >> complicate it, especially if it will be possible to resolve the problem >> automatically(as described above). >> >> >> So is my understanding about network memory/buffers correct? >> >> -- >> >> Best regards, >> Anton Kalashnikov >> >> 27.04.2022 07:46, rui fan пишет: >> > Hi everyone, >> > >> > Unaligned Checkpoint (FLIP-76 [1]) is a major feature of Flink. It >> > effectively solves the problem of checkpoint timeout or slow >> > checkpoint when backpressure is severe. >> > >> > We found that UC(Unaligned Checkpoint) does not work well when the >> > back pressure is severe and multiple output buffers are required to >> > process a single record. FLINK-14396 [2] also mentioned this issue >> > before. So we propose the overdraft buffer to solve it. >> > >> > I created FLINK-26762[3] and FLIP-227[4] to detail the overdraft >> > buffer mechanism. After discussing with Anton Kalashnikov, there are >> > still some points to discuss: >> > >> > * There are already a lot of buffer-related configurations. Do we >> > need to add a new configuration for the overdraft buffer? >> > * Where should the overdraft buffer use memory? >> > * If the overdraft-buffer uses the memory remaining in the >> > NetworkBufferPool, no new configuration needs to be added. >> > * If adding a new configuration: >> > o Should we set the overdraft-memory-size at the TM level or the >> > Task level? >> > o Or set overdraft-buffers to indicate the number of >> > memory-segments that can be overdrawn. >> > o What is the default value? How to set sensible defaults? >> > >> > Currently, I implemented a POC [5] and verified it using >> > flink-benchmarks [6]. The POC sets overdraft-buffers at Task level, >> > and default value is 10. That is: each LocalBufferPool can overdraw up >> > to 10 memory-segments. >> > >> > Looking forward to your feedback! >> > >> > Thanks, >> > fanrui >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints >> > [2] https://issues.apache.org/jira/browse/FLINK-14396 >> > [3] https://issues.apache.org/jira/browse/FLINK-26762 >> > [4] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >> > [5] >> > >> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe >> > [6] https://github.com/apache/flink-benchmarks/pull/54 >> >>