Hi Anton Kalashnikov,

I think you agree with we should limit the maximum number of overdraft
segments that each LocalBufferPool can apply for, right?

I prefer to hard code the maxOverdraftBuffers due to don't add the new
configuration. And I hope to hear more from the community.

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote:

> Hi Anton Kalashnikov,
>
> Thanks for your very clear reply, I think you are totally right.
>
> The 'maxBuffersNumber - buffersInUseNumber' can be used as the
> overdraft buffer, it won't need the new buffer configuration.Flink users
> can turn up the maxBuffersNumber to control the overdraft buffer size.
>
> Also, I‘d like to add some information. For safety, we should limit the
> maximum number of overdraft segments that each LocalBufferPool
> can apply for.
>
> Why do we limit it?
> Some operators don't check the `recordWriter.isAvailable` during
> processing records, such as LegacySource. I have mentioned it in
> FLINK-26759 [1]. I'm not sure if there are other cases.
>
> If don't add the limitation, the LegacySource will use up all remaining
> memory in the NetworkBufferPool when the backpressure is severe.
>
> How to limit it?
> I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions`
> in the constructor of LocalBufferPool. The maxOverdraftBuffers is just
> for safety, and it should be enough for most flink jobs. Or we can set
> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle
> some jobs of low parallelism.
>
> Also if user don't enable the Unaligned Checkpoint, we can set
> maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because
> the overdraft isn't useful for the Aligned Checkpoint.
>
> Please correct me if I'm wrong. Thanks a lot.
>
> [1] https://issues.apache.org/jira/browse/FLINK-26759
>
> Best wishes
> fanrui
>
> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <kaa....@yandex.com>
> wrote:
>
>> Hi fanrui,
>>
>> Thanks for creating the FLIP.
>>
>> In general, I think the overdraft is good idea and it should help in
>> described above cases. Here are my thoughts about configuration:
>>
>> Please, correct me if I am wrong but as I understand right now we have
>> following calculation.
>>
>> maxBuffersNumber(per TaskManager) = Network memory(calculated via
>> taskmanager.memory.network.fraction, taskmanager.memory.network.min,
>> taskmanager.memory.network.max and total memory size) /
>> taskmanager.memory.segment-size.
>>
>> requiredBuffersNumber(per TaskManager) = (exclusive buffers *
>> parallelism + floating buffers) * subtasks number in TaskManager
>>
>> buffersInUseNumber = real number of buffers which used at current
>> moment(always <= requiredBuffersNumber)
>>
>> Ideally requiredBuffersNumber should be equal to maxBuffersNumber which
>> allows Flink work predictibly. But if requiredBuffersNumber >
>> maxBuffersNumber sometimes it is also fine(but not good) since not all
>> required buffers really mandatory(e.g. it is ok if Flink can not
>> allocate floating buffers)
>>
>> But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink
>> just never use these leftovers buffers(maxBuffersNumber -
>> requiredBuffersNumber). Which I propose to use. ( we can actualy use
>> even difference 'requiredBuffersNumber - buffersInUseNumber' since if
>> one TaskManager contains several operators including 'window' which can
>> temporally borrow buffers from the global pool).
>>
>> My proposal, more specificaly(it relates only to requesting buffers
>> during processing single record while switching to unavalability between
>> records should be the same as we have it now):
>>
>> * If one more buffer requested but maxBuffersPerChannel reached, then
>> just ignore this limitation and allocate this buffers from any
>> place(from LocalBufferPool if it has something yet otherwise from
>> NetworkBufferPool)
>>
>> * If LocalBufferPool exceeds limit, then temporally allocate it from
>> NetworkBufferPool while it has something to allocate
>>
>>
>> Maybe I missed something and this solution won't work, but I like it
>> since on the one hand, it work from the scratch without any
>> configuration, on the other hand, it can be configuration by changing
>> proportion of maxBuffersNumber and requiredBuffersNumber.
>>
>> The last thing that I want to say, I don't really want to implement new
>> configuration since even now it is not clear how to correctly configure
>> network buffers with existing configuration and I don't want to
>> complicate it, especially if it will be possible to resolve the problem
>> automatically(as described above).
>>
>>
>> So is my understanding about network memory/buffers correct?
>>
>> --
>>
>> Best regards,
>> Anton Kalashnikov
>>
>> 27.04.2022 07:46, rui fan пишет:
>> > Hi everyone,
>> >
>> > Unaligned Checkpoint (FLIP-76 [1]) is a major feature of Flink. It
>> > effectively solves the problem of checkpoint timeout or slow
>> > checkpoint when backpressure is severe.
>> >
>> > We found that UC(Unaligned Checkpoint) does not work well when the
>> > back pressure is severe and multiple output buffers are required to
>> > process a single record. FLINK-14396 [2] also mentioned this issue
>> > before. So we propose the overdraft buffer to solve it.
>> >
>> > I created FLINK-26762[3] and FLIP-227[4] to detail the overdraft
>> > buffer mechanism. After discussing with Anton Kalashnikov, there are
>> > still some points to discuss:
>> >
>> >   * There are already a lot of buffer-related configurations. Do we
>> >     need to add a new configuration for the overdraft buffer?
>> >   * Where should the overdraft buffer use memory?
>> >   * If the overdraft-buffer uses the memory remaining in the
>> >     NetworkBufferPool, no new configuration needs to be added.
>> >   * If adding a new configuration:
>> >       o Should we set the overdraft-memory-size at the TM level or the
>> >         Task level?
>> >       o Or set overdraft-buffers to indicate the number of
>> >         memory-segments that can be overdrawn.
>> >       o What is the default value? How to set sensible defaults?
>> >
>> > Currently, I implemented a POC [5] and verified it using
>> > flink-benchmarks [6]. The POC sets overdraft-buffers at Task level,
>> > and default value is 10. That is: each LocalBufferPool can overdraw up
>> > to 10 memory-segments.
>> >
>> > Looking forward to your feedback!
>> >
>> > Thanks,
>> > fanrui
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>> > [2] https://issues.apache.org/jira/browse/FLINK-14396
>> > [3] https://issues.apache.org/jira/browse/FLINK-26762
>> > [4]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>> > [5]
>> >
>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
>> > [6] https://github.com/apache/flink-benchmarks/pull/54
>>
>>

Reply via email to