Hi Anton, Piotrek and Dawid,

Thanks for your help.

I created FLINK-27522[1] as the first task. And I will finish it asap.

@Piotrek, for the default value, do you think it should be less
than 5? What do you think about 3? Actually, I think 5 isn't big.
It's 1 or 3 or 5 that doesn't matter much, the focus is on
reasonably resolving deadlock problems. Or I push the second
task to move forward first and we discuss the default value in PR.

For the legacySource, I got your idea. And I propose we create
the third task to handle it. Because it is independent and for
compatibility with the old API. What do you think? I updated
the third task on FLIP-227[2].

If all is ok, I will create a JIRA for the third Task and add it to
FLIP-227. And I will develop them from the first task to the
third task.

Thanks again for your help.

[1] https://issues.apache.org/jira/browse/FLINK-27522
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer

Thanks
fanrui

On Fri, May 6, 2022 at 3:50 AM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi fanrui,
>
> > How to identify legacySource?
>
> legacy sources are always using the SourceStreamTask class and
> SourceStreamTask is used only for legacy sources. But I'm not sure how to
> enable/disable that. Adding `disableOverdraft()` call in SourceStreamTask
> would be better compared to relying on the `getAvailableFuture()` call
> (isn't it used for back pressure metric anyway?). Ideally we should
> enable/disable it in the constructors, but that might be tricky.
>
> > I prefer it to be between 5 and 10
>
> I would vote for a smaller value because of FLINK-13203
>
> Piotrek
>
>
>
> czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a):
>
>> Hi,
>>
>> Thanks a lot for your discussion.
>>
>> After several discussions, I think it's clear now. I updated the
>> "Proposed Changes" of FLIP-227[1]. If I have something
>> missing, please help to add it to FLIP, or add it in the mail
>> and I can add it to FLIP. If everything is OK, I will create a
>> new JIRA for the first task, and use FLINK-26762[2] as the
>> second task.
>>
>> About the legacy source, do we set maxOverdraftBuffersPerGate=0
>> directly? How to identify legacySource? Or could we add
>> the overdraftEnabled in LocalBufferPool? The default value
>> is false. If the getAvailableFuture is called, change
>> overdraftEnabled=true.
>> It indicates whether there are checks isAvailable elsewhere.
>> It might be more general, it can cover more cases.
>>
>> Also, I think the default value of 'max-overdraft-buffers-per-gate'
>> needs to be confirmed. I prefer it to be between 5 and 10. How
>> do you think?
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>> [2] https://issues.apache.org/jira/browse/FLINK-26762
>>
>> Thanks
>> fanrui
>>
>> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi again,
>>>
>>> After sleeping over this, if both versions (reserve and overdraft) have
>>> the same complexity, I would also prefer the overdraft.
>>>
>>> > `Integer.MAX_VALUE` as default value was my idea as well but now, as
>>> > Dawid mentioned, I think it is dangerous since it is too implicit for
>>> > the user and if the user submits one more job for the same TaskManger
>>>
>>> As I mentioned, it's not only an issue with multiple jobs. The same
>>> problem can happen with different subtasks from the same job, potentially
>>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I would be
>>> in favour of Integer.MAX_VALUE to be the default value, but as it is, I
>>> think we should indeed play on the safe side and limit it.
>>>
>>> > I still don't understand how should be limited "reserve"
>>> implementation.
>>> > I mean if we have X buffers in total and the user sets overdraft equal
>>> > to X we obviously can not reserve all buffers, but how many we are
>>> > allowed to reserve? Should it be a different configuration like
>>> > percentegeForReservedBuffers?
>>>
>>> The reserve could be defined as percentage, or as a fixed number of
>>> buffers. But yes. In normal operation subtask would not use the reserve, as
>>> if numberOfAvailableBuffers < reserve, the output would be not available.
>>> Only in the flatMap/timers/huge records case the reserve could be used.
>>>
>>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers,
>>> will LocalBufferPool never be available? Can't process data?
>>>
>>> Of course we would need to make sure that never happens. So the reserve
>>> should be < total buffer size.
>>>
>>> > 2. If the overdraft buffer use the extra buffers, when the downstream
>>> > task inputBuffer is insufficient, it should fail to start the job, and
>>> then
>>> > restart? When the InputBuffer is initialized, it will apply for enough
>>> > buffers, right?
>>>
>>> The failover if downstream can not allocate buffers is already
>>> implemented FLINK-14872 [2]. There is a timeout for how long the task is
>>> waiting for buffer allocation. However this doesn't prevent many
>>> (potentially infinitely many) deadlock/restarts cycles. IMO the propper
>>> solution for [1] would be 2b described in the ticket:
>>>
>>> > 2b. Assign extra buffers only once all of the tasks are RUNNING. This
>>> is a simplified version of 2a, without tracking the tasks sink-to-source.
>>>
>>> But that's a pre-existing problem and I don't think we have to solve it
>>> before implementing overdraft. I think we would need to solve it only
>>> before setting Integer.MAX_VALUE as the default for the overdraft. Maybe I
>>> would hesitate setting the overdraft to anything more then a couple of
>>> buffers by default for the same reason.
>>>
>>> > Actually, I totally agree that we don't need a lot of buffers for
>>> overdraft
>>>
>>> and
>>>
>>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
>>> > When we finish this feature and after users use it, if users feedback
>>> > this issue we can discuss again.
>>>
>>> +1
>>>
>>> Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-13203
>>> [2] https://issues.apache.org/jira/browse/FLINK-14872
>>>
>>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):
>>>
>>>> Hi everyone,
>>>>
>>>> I still have some questions.
>>>>
>>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers, will
>>>> LocalBufferPool never be available? Can't process data?
>>>> 2. If the overdraft buffer use the extra buffers, when the downstream
>>>> task inputBuffer is insufficient, it should fail to start the job, and
>>>> then
>>>> restart? When the InputBuffer is initialized, it will apply for enough
>>>> buffers, right?
>>>>
>>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
>>>> When we finish this feature and after users use it, if users feedback
>>>> this issue we can discuss again.
>>>>
>>>> Thanks
>>>> fanrui
>>>>
>>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <dwysakow...@apache.org>
>>>> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> I have not replied in the thread yet, but I was following the
>>>>> discussion.
>>>>>
>>>>> Personally, I like Fanrui's and Anton's idea. As far as I understand
>>>>> it
>>>>> the idea to distinguish between inside flatMap & outside would be
>>>>> fairly
>>>>> simple, but maybe slightly indirect. The checkAvailability would
>>>>> remain
>>>>> unchanged and it is checked always between separate invocations of the
>>>>> UDF. Therefore the overdraft buffers would not apply there. However
>>>>> once
>>>>> the pool says it is available, it means it has at least an initial
>>>>> buffer. So any additional request without checking for availability
>>>>> can
>>>>> be considered to be inside of processing a single record. This does
>>>>> not
>>>>> hold just for the LegacySource as I don't think it actually checks for
>>>>> the availability of buffers in the LocalBufferPool.
>>>>>
>>>>> In the offline chat with Anton, we also discussed if we need a limit
>>>>> of
>>>>> the number of buffers we could overdraft (or in other words if the
>>>>> limit
>>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to
>>>>> stay
>>>>> on the safe side and have it limited. The pool of network buffers is
>>>>> shared for the entire TaskManager, so it means it can be shared even
>>>>> across tasks of separate jobs. However, I might be just unnecessarily
>>>>> cautious here.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 04/05/2022 10:54, Piotr Nowojski wrote:
>>>>> > Hi,
>>>>> >
>>>>> > Thanks for the answers.
>>>>> >
>>>>> >> we may still need to discuss whether the
>>>>> >> overdraft/reserve/spare should use extra buffers or buffers
>>>>> >> in (exclusive + floating buffers)?
>>>>> > and
>>>>> >
>>>>> >> These things resolve the different problems (at least as I see
>>>>> that).
>>>>> >> The current hardcoded "1"  says that we switch "availability" to
>>>>> >> "unavailability" when one more buffer is left(actually a little less
>>>>> >> than one buffer since we write the last piece of data to this last
>>>>> >> buffer). The overdraft feature doesn't change this logic we still
>>>>> want
>>>>> >> to switch to "unavailability" in such a way but if we are already in
>>>>> >> "unavailability" and we want more buffers then we can take
>>>>> "overdraft
>>>>> >> number" more. So we can not avoid this hardcoded "1" since we need
>>>>> to
>>>>> >> understand when we should switch to "unavailability"
>>>>> > Ok, I see. So it seems to me that both of you have in mind to keep
>>>>> the
>>>>> > buffer pools as they are right now, but if we are in the middle of
>>>>> > processing a record, we can request extra overdraft buffers on top of
>>>>> > those? This is another way to implement the overdraft to what I was
>>>>> > thinking. I was thinking about something like keeping the
>>>>> "overdraft" or
>>>>> > more precisely buffer "reserve" in the buffer pool. I think my
>>>>> version
>>>>> > would be easier to implement, because it is just fiddling with
>>>>> min/max
>>>>> > buffers calculation and slightly modified `checkAvailability()`
>>>>> logic.
>>>>> >
>>>>> > On the other hand  what you have in mind would better utilise the
>>>>> available
>>>>> > memory, right? It would require more code changes (how would we know
>>>>> when
>>>>> > we are allowed to request the overdraft?). However, in this case, I
>>>>> would
>>>>> > be tempted to set the number of overdraft buffers by default to
>>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers as
>>>>> > necessary. The only downside that I can think of (apart of higher
>>>>> > complexity) would be higher chance of hitting a known/unsolved
>>>>> deadlock [1]
>>>>> > in a scenario:
>>>>> > - downstream task hasn't yet started
>>>>> > - upstream task requests overdraft and uses all available memory
>>>>> segments
>>>>> > from the global pool
>>>>> > - upstream task is blocked, because downstream task hasn't started
>>>>> yet and
>>>>> > can not consume any data
>>>>> > - downstream task tries to start, but can not, as there are no
>>>>> available
>>>>> > buffers
>>>>> >
>>>>> >> BTW, for watermark, the number of buffers it needs is
>>>>> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
>>>>> >> the watermark won't block in requestMemory.
>>>>> > and
>>>>> >
>>>>> >> the best overdraft size will be equal to parallelism.
>>>>> > That's a lot of buffers. I don't think we need that many for
>>>>> broadcasting
>>>>> > watermarks. Watermarks are small, and remember that every
>>>>> subpartition has
>>>>> > some partially filled/empty WIP buffer, so the vast majority of
>>>>> > subpartitions will not need to request a new buffer.
>>>>> >
>>>>> > Best,
>>>>> > Piotrek
>>>>> >
>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203
>>>>> >
>>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com>
>>>>> napisał(a):
>>>>> >
>>>>> >> Hi,
>>>>> >>
>>>>> >>
>>>>> >>   >> Do you mean to ignore it while processing records, but keep
>>>>> using
>>>>> >> `maxBuffersPerChannel` when calculating the availability of the
>>>>> output?
>>>>> >>
>>>>> >>
>>>>> >> Yes, it is correct.
>>>>> >>
>>>>> >>
>>>>> >>   >> Would it be a big issue if we changed it to check if at least
>>>>> >> "overdraft number of buffers are available", where "overdraft
>>>>> number" is
>>>>> >> configurable, instead of the currently hardcoded value of "1"?
>>>>> >>
>>>>> >>
>>>>> >> These things resolve the different problems (at least as I see
>>>>> that).
>>>>> >> The current hardcoded "1"  says that we switch "availability" to
>>>>> >> "unavailability" when one more buffer is left(actually a little less
>>>>> >> than one buffer since we write the last piece of data to this last
>>>>> >> buffer). The overdraft feature doesn't change this logic we still
>>>>> want
>>>>> >> to switch to "unavailability" in such a way but if we are already in
>>>>> >> "unavailability" and we want more buffers then we can take
>>>>> "overdraft
>>>>> >> number" more. So we can not avoid this hardcoded "1" since we need
>>>>> to
>>>>> >> understand when we should switch to "unavailability"
>>>>> >>
>>>>> >>
>>>>> >> -- About "reserve" vs "overdraft"
>>>>> >>
>>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will be
>>>>> >> equal to parallelism. Also, the user can set any value he wants. So
>>>>> even
>>>>> >> if parallelism is small(~5) but the user's flatmap produces a lot of
>>>>> >> data, the user can set 10 or even more. Which almost double the max
>>>>> >> buffers and it will be impossible to reserve. At least we need to
>>>>> figure
>>>>> >> out how to protect from such cases (the limit for an overdraft?). So
>>>>> >> actually it looks even more difficult than increasing the maximum
>>>>> buffers.
>>>>> >>
>>>>> >> I want to emphasize that overdraft buffers are soft configuration
>>>>> which
>>>>> >> means it takes as many buffers as the global buffers pool has
>>>>> >> available(maybe zero) but less than this configured value. It is
>>>>> also
>>>>> >> important to notice that perhaps, not many subtasks in TaskManager
>>>>> will
>>>>> >> be using this feature so we don't actually need a lot of available
>>>>> >> buffers for every subtask(Here, I mean that if we have only one
>>>>> >> window/flatmap operator and many other operators, then one
>>>>> TaskManager
>>>>> >> will have many ordinary subtasks which don't actually need
>>>>> overdraft and
>>>>> >> several subtasks that needs this feature). But in case of
>>>>> reservation,
>>>>> >> we will reserve some buffers for all operators even if they don't
>>>>> really
>>>>> >> need it.
>>>>> >>
>>>>> >>
>>>>> >> -- Legacy source problem
>>>>> >>
>>>>> >> If we still want to change max buffers then it is problem for
>>>>> >> LegacySources(since every subtask of source will always use these
>>>>> >> overdraft). But right now, I think that we can force to set 0
>>>>> overdraft
>>>>> >> buffers for legacy subtasks in configuration during execution(if it
>>>>> is
>>>>> >> not too late for changing configuration in this place).
>>>>> >>
>>>>> >>
>>>>> >> 03.05.2022 14:11, rui fan пишет:
>>>>> >>> Hi
>>>>> >>>
>>>>> >>> Thanks for Martijn Visser and Piotrek's feedback.  I agree with
>>>>> >>> ignoring the legacy source, it will affect our design. User should
>>>>> >>> use the new Source Api as much as possible.
>>>>> >>>
>>>>> >>> Hi Piotrek, we may still need to discuss whether the
>>>>> >>> overdraft/reserve/spare should use extra buffers or buffers
>>>>> >>> in (exclusive + floating buffers)? They have some differences.
>>>>> >>>
>>>>> >>> If it uses extra buffers:
>>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1
>>>>> >>>    <= currentPoolSize) and all subpartitions don't reach the
>>>>> >>> maxBuffersPerChannel.
>>>>> >>>
>>>>> >>> If it uses the buffers in (exclusive + floating buffers):
>>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers +
>>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions
>>>>> >>> don't reach the maxBuffersPerChannel.
>>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), the
>>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be
>>>>> >>> easily unavailable. For throughput, if users turn up the
>>>>> >>> overdraft buffers, they need to turn up exclusive or floating
>>>>> >>> buffers. It also affects the InputChannel, and it's is unfriendly
>>>>> >>> to users.
>>>>> >>>
>>>>> >>> So I prefer the overdraft to use extra buffers.
>>>>> >>>
>>>>> >>>
>>>>> >>> BTW, for watermark, the number of buffers it needs is
>>>>> >>> numberOfSubpartitions. So if
>>>>> overdraftBuffers=numberOfSubpartitions,
>>>>> >>> the watermark won't block in requestMemory. But it has
>>>>> >>> 2 problems:
>>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses
>>>>> >>> (exclusive + floating buffers),  there will be fewer buffers
>>>>> >>> available. Throughput may be affected.
>>>>> >>> 2. The numberOfSubpartitions is different for each Task.
>>>>> >>> So if users want to cover watermark using this feature,
>>>>> >>> they don't know how to set the overdraftBuffers more r
>>>>> >>> easonably. And if the parallelism is changed, users still
>>>>> >>> need to change overdraftBuffers. It is unfriendly to users.
>>>>> >>>
>>>>> >>> So I propose we support overdraftBuffers=-1, It means
>>>>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions
>>>>> >>> in the Constructor of LocalBufferPool.
>>>>> >>>
>>>>> >>> Please correct me if I'm wrong.
>>>>> >>>
>>>>> >>> Thanks
>>>>> >>> fanrui
>>>>> >>>
>>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <
>>>>> pnowoj...@apache.org>
>>>>> >> wrote:
>>>>> >>>> Hi fanrui,
>>>>> >>>>
>>>>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive
>>>>> >> buffers *
>>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be
>>>>> available
>>>>> >>>> when
>>>>> >>>>> (usedBuffers+overdraftBuffers <=
>>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers)
>>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel,
>>>>> right?
>>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum
>>>>> number of
>>>>> >> the
>>>>> >>>> required buffers, just as we did when we were implementing the non
>>>>> >> blocking
>>>>> >>>> outputs and adding availability logic to LocalBufferPool. Back
>>>>> then we
>>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this
>>>>> logic is
>>>>> >>>> located
>>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
>>>>> >>>>
>>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
>>>>> numSubpartitions + 1;
>>>>> >>>> For performance reasons, we always require at least one buffer per
>>>>> >>>> sub-partition. Otherwise performance falls drastically. Now if we
>>>>> >> require 5
>>>>> >>>> overdraft buffers for output to be available, we need to have
>>>>> them on
>>>>> >> top
>>>>> >>>> of those "one buffer per sub-partition". So the logic should be
>>>>> changed
>>>>> >> to:
>>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
>>>>> numSubpartitions +
>>>>> >>>> numOverdraftBuffers;
>>>>> >>>>
>>>>> >>>> Regarding increasing the number of max buffers I'm not sure. As
>>>>> long as
>>>>> >>>> "overdraft << max number of buffers", because all buffers on the
>>>>> outputs
>>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft
>>>>> buffers,
>>>>> >> and
>>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of
>>>>> things if
>>>>> >> we
>>>>> >>>> make the output available if at least one single buffer is
>>>>> available or
>>>>> >> at
>>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So
>>>>> effects of
>>>>> >>>> increasing the overdraft from 1 to for example 5 should be
>>>>> negligible.
>>>>> >> For
>>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 still
>>>>> >> increases
>>>>> >>>> the overdraft by only about 25%. So maybe we can keep the max as
>>>>> it is?
>>>>> >>>>
>>>>> >>>> If so, maybe we should change the name from "overdraft" to "buffer
>>>>> >> reserve"
>>>>> >>>> or "spare buffers"? And document it as "number of buffers kept in
>>>>> >> reserve
>>>>> >>>> in case of flatMap/firing timers/huge records"?
>>>>> >>>>
>>>>> >>>> What do you think Fenrui, Anton?
>>>>> >>>>
>>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the new
>>>>> >> features,
>>>>> >>>> as long as we don't brake the existing deployments too much.
>>>>> >>>>
>>>>> >>>> Best,
>>>>> >>>> Piotrek
>>>>> >>>>
>>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com>
>>>>> >> napisał(a):
>>>>> >>>>> Hi everyone,
>>>>> >>>>>
>>>>> >>>>> Just wanted to chip in on the discussion of legacy sources:
>>>>> IMHO, we
>>>>> >>>> should
>>>>> >>>>> not focus too much on improving/adding capabilities for legacy
>>>>> sources.
>>>>> >>>> We
>>>>> >>>>> want to persuade and push users to use the new Source API. Yes,
>>>>> this
>>>>> >>>> means
>>>>> >>>>> that there's work required by the end users to port any custom
>>>>> source
>>>>> >> to
>>>>> >>>>> the new interface. The benefits of the new Source API should
>>>>> outweigh
>>>>> >>>> this.
>>>>> >>>>> Anything that we build to support multiple interfaces means
>>>>> adding more
>>>>> >>>>> complexity and more possibilities for bugs. Let's try to make our
>>>>> >> lives a
>>>>> >>>>> little bit easier.
>>>>> >>>>>
>>>>> >>>>> Best regards,
>>>>> >>>>>
>>>>> >>>>> Martijn Visser
>>>>> >>>>> https://twitter.com/MartijnVisser82
>>>>> >>>>> https://github.com/MartijnVisser
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com>
>>>>> wrote:
>>>>> >>>>>
>>>>> >>>>>> Hi Piotrek
>>>>> >>>>>>
>>>>> >>>>>>> Do you mean to ignore it while processing records, but keep
>>>>> using
>>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the
>>>>> >>>> output?
>>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double check.
>>>>> >>>>>>
>>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a
>>>>> big
>>>>> >>>>> problem
>>>>> >>>>>>> that legacy sources would be ignoring it? Note that we already
>>>>> have
>>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
>>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single
>>>>> >>>> buffer
>>>>> >>>>>>> available and this works the same for all tasks (including
>>>>> legacy
>>>>> >>>>> source
>>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if
>>>>> at least
>>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft
>>>>> number"
>>>>> >>>>> is
>>>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"?
>>>>> >>>>>> Do you mean don't add the extra buffers? We just use (exclusive
>>>>> >>>> buffers *
>>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be
>>>>> available
>>>>> >>>>> when
>>>>> >>>>>> (usedBuffers+overdraftBuffers <=
>>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers)
>>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel,
>>>>> right?
>>>>> >>>>>>
>>>>> >>>>>> If yes, I think it can solve the problem of legacy source.
>>>>> There may
>>>>> >> be
>>>>> >>>>>> some impact. If overdraftBuffers is large and only one buffer
>>>>> is used
>>>>> >>>> to
>>>>> >>>>>> process a single record, exclusive buffers*parallelism +
>>>>> floating
>>>>> >>>> buffers
>>>>> >>>>>> cannot be used. It may only be possible to use (exclusive
>>>>> buffers *
>>>>> >>>>>> parallelism
>>>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, if
>>>>> turn
>>>>> >> up
>>>>> >>>>> the
>>>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive or
>>>>> >>>> floating
>>>>> >>>>>> buffers. And it also affects the InputChannel.
>>>>> >>>>>>
>>>>> >>>>>> If not, I don't think it can solve the problem of legacy
>>>>> source. The
>>>>> >>>>> legacy
>>>>> >>>>>> source don't check isAvailable, If there are the extra buffers,
>>>>> legacy
>>>>> >>>>>> source
>>>>> >>>>>> will use them up until block in requestMemory.
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>> Thanks
>>>>> >>>>>> fanrui
>>>>> >>>>>>
>>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <
>>>>> pnowoj...@apache.org>
>>>>> >>>>>> wrote:
>>>>> >>>>>>
>>>>> >>>>>>> Hi,
>>>>> >>>>>>>
>>>>> >>>>>>> +1 for the general proposal from my side. It would be a nice
>>>>> >>>> workaround
>>>>> >>>>>>> flatMaps, WindowOperators and large records issues with
>>>>> unaligned
>>>>> >>>>>>> checkpoints.
>>>>> >>>>>>>
>>>>> >>>>>>>> The first task is about ignoring max buffers per channel. This
>>>>> >>>> means
>>>>> >>>>> if
>>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the
>>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just
>>>>> ignore
>>>>> >>>> that
>>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has
>>>>> it(it is
>>>>> >>>>>>>> actually not a overdraft).
>>>>> >>>>>>> Do you mean to ignore it while processing records, but keep
>>>>> using
>>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the
>>>>> >>>> output?
>>>>> >>>>>>>> The second task is about the real overdraft. I am pretty
>>>>> convinced
>>>>> >>>>> now
>>>>> >>>>>>>> that we, unfortunately, need configuration for limitation of
>>>>> >>>>> overdraft
>>>>> >>>>>>>> number(because it is not ok if one subtask allocates all
>>>>> buffers of
>>>>> >>>>> one
>>>>> >>>>>>>> TaskManager considering that several different jobs can be
>>>>> >>>> submitted
>>>>> >>>>> on
>>>>> >>>>>>>> this TaskManager). So idea is to have
>>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>>>>> >>>>>> LocalBufferPool).
>>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is
>>>>> >>>> reached,
>>>>> >>>>>>>> LocalBufferPool can request additionally from
>>>>> NetworkBufferPool up
>>>>> >>>> to
>>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a
>>>>> big
>>>>> >>>>> problem
>>>>> >>>>>>> that legacy sources would be ignoring it? Note that we already
>>>>> have
>>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
>>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single
>>>>> >>>> buffer
>>>>> >>>>>>> available and this works the same for all tasks (including
>>>>> legacy
>>>>> >>>>> source
>>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if
>>>>> at least
>>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft
>>>>> number"
>>>>> >>>>> is
>>>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"?
>>>>> >>>>>>>
>>>>> >>>>>>> Best,
>>>>> >>>>>>> Piotrek
>>>>> >>>>>>>
>>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com>
>>>>> napisał(a):
>>>>> >>>>>>>
>>>>> >>>>>>>> Let me add some information about the LegacySource.
>>>>> >>>>>>>>
>>>>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource.
>>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
>>>>> >>>>>>>> The default value is false. If the getAvailableFuture is
>>>>> called,
>>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there are
>>>>> >>>>>>>> checks isAvailable elsewhere.
>>>>> >>>>>>>>
>>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct me
>>>>> if
>>>>> >>>> I'm
>>>>> >>>>>>> wrong.
>>>>> >>>>>>>> Thanks
>>>>> >>>>>>>> fanrui
>>>>> >>>>>>>>
>>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <
>>>>> 1996fan...@gmail.com>
>>>>> >>>>> wrote:
>>>>> >>>>>>>>> Hi,
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Thanks for your quick response.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need to
>>>>> >>>>> discuss
>>>>> >>>>>>> the
>>>>> >>>>>>>>> default value in PR.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for
>>>>> general
>>>>> >>>>>>>>> implementation.
>>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in
>>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common
>>>>> >>>>> LegacySource,
>>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume
>>>>> >>>> kafka,
>>>>> >>>>> so
>>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Core code:
>>>>> >>>>>>>>> ```
>>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() {
>>>>> >>>>>>>>>        if (recordWriter == null
>>>>> >>>>>>>>>             ||
>>>>> >>>>>>>>>
>>>>> >>
>>>>> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
>>>>> >>>>>>>>> false)
>>>>> >>>>>>>>>             || recordWriter.isAvailable()) {
>>>>> >>>>>>>>>             return;
>>>>> >>>>>>>>>        }
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>        CompletableFuture<?> resumeFuture =
>>>>> >>>>>>>> recordWriter.getAvailableFuture();
>>>>> >>>>>>>>>        try {
>>>>> >>>>>>>>>             resumeFuture.get();
>>>>> >>>>>>>>>        } catch (Throwable ignored) {
>>>>> >>>>>>>>>        }
>>>>> >>>>>>>>> }
>>>>> >>>>>>>>> ```
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> LegacySource calls
>>>>> sourceContext.ensureRecordWriterIsAvailable()
>>>>> >>>>>>>>> before synchronized (checkpointLock) and collects records.
>>>>> >>>>>>>>> Please let me know if there is a better solution.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Thanks
>>>>> >>>>>>>>> fanrui
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
>>>>> >>>>>> kaa....@yandex.com>
>>>>> >>>>>>>>> wrote:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>> Hi.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or
>>>>> two
>>>>> >>>>>> commits
>>>>> >>>>>>>> in a
>>>>> >>>>>>>>>>       PR?
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this
>>>>> task has
>>>>> >>>>>> fewer
>>>>> >>>>>>>>>> questions but we should find a solution for LegacySource
>>>>> first.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> --  2. For the first task, if the flink user disables the
>>>>> >>>>> Unaligned
>>>>> >>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
>>>>> Because
>>>>> >>>> the
>>>>> >>>>>>>>>> overdraft
>>>>> >>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
>>>>> needs to
>>>>> >>>>> wait
>>>>> >>>>>>> for
>>>>> >>>>>>>>>>       downstream Task to consume.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> I think that the logic should be the same for AC and UC. As
>>>>> I
>>>>> >>>>>>>> understand,
>>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it
>>>>> doesn't
>>>>> >>>>> make
>>>>> >>>>>>> it
>>>>> >>>>>>>>>> worse as well.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>     3. For the second task
>>>>> >>>>>>>>>> --      - The default value of
>>>>> maxOverdraftBuffersPerPartition
>>>>> >>>> may
>>>>> >>>>>>> also
>>>>> >>>>>>>>>> need
>>>>> >>>>>>>>>>          to be discussed.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> I think it should be a pretty small value or even 0 since it
>>>>> >>>> kind
>>>>> >>>>> of
>>>>> >>>>>>>>>> optimization and user should understand what they
>>>>> do(especially
>>>>> >>>> if
>>>>> >>>>>> we
>>>>> >>>>>>>>>> implement the first task).
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> --      - If the user disables the Unaligned Checkpoint,
>>>>> can we
>>>>> >>>>> set
>>>>> >>>>>>> the
>>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>>>>> overdraft
>>>>> >>>>>> isn't
>>>>> >>>>>>>>>> useful for
>>>>> >>>>>>>>>>          the Aligned Checkpoint.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make
>>>>> >>>>>> degradation
>>>>> >>>>>>>> for
>>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make
>>>>> >>>>> difference
>>>>> >>>>>>>> between
>>>>> >>>>>>>>>> AC and UC.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>       4. For the legacy source
>>>>> >>>>>>>>>> --      - If enabling the Unaligned Checkpoint, it uses up
>>>>> to
>>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>>>>> >>>>>>>>>>          - If disabling the UC, it doesn't use the overdraft
>>>>> >>>> buffer.
>>>>> >>>>>>>>>>          - Do you think it's ok?
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at
>>>>> all
>>>>> >>>>> since
>>>>> >>>>>>> it
>>>>> >>>>>>>>>> can lead to undesirable results especially if the limit is
>>>>> high.
>>>>> >>>>> At
>>>>> >>>>>>>> least,
>>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode and
>>>>> it
>>>>> >>>> will
>>>>> >>>>>>>> borrow
>>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool
>>>>> >>>> which
>>>>> >>>>>> can
>>>>> >>>>>>>> lead
>>>>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> --      - Actually, we added the checkAvailable logic for
>>>>> >>>>>> LegacySource
>>>>> >>>>>>>> in
>>>>> >>>>>>>>>> our
>>>>> >>>>>>>>>>          internal version. It works well.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> I don't really understand how it is possible for general
>>>>> case
>>>>> >>>>>>>> considering
>>>>> >>>>>>>>>> that each user has their own implementation of
>>>>> >>>>> LegacySourceOperator
>>>>> >>>>>>>>>> --   5. For the benchmark, do you have any suggestions? I
>>>>> >>>>> submitted
>>>>> >>>>>>> the
>>>>> >>>>>>>> PR
>>>>> >>>>>>>>>>       [1].
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
>>>>> >>>>>>>>>>> Hi,
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions.
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>       1. Do you mean split this into two JIRAs or two PRs
>>>>> or two
>>>>> >>>>>>> commits
>>>>> >>>>>>>>>> in a
>>>>> >>>>>>>>>>>       PR?
>>>>> >>>>>>>>>>>       2. For the first task, if the flink user disables the
>>>>> >>>>>> Unaligned
>>>>> >>>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
>>>>> Because
>>>>> >>>>> the
>>>>> >>>>>>>>>> overdraft
>>>>> >>>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
>>>>> needs to
>>>>> >>>>>> wait
>>>>> >>>>>>>> for
>>>>> >>>>>>>>>>>       downstream Task to consume.
>>>>> >>>>>>>>>>>       3. For the second task
>>>>> >>>>>>>>>>>          - The default value of
>>>>> maxOverdraftBuffersPerPartition
>>>>> >>>>> may
>>>>> >>>>>>> also
>>>>> >>>>>>>>>> need
>>>>> >>>>>>>>>>>          to be discussed.
>>>>> >>>>>>>>>>>          - If the user disables the Unaligned Checkpoint,
>>>>> can we
>>>>> >>>>> set
>>>>> >>>>>>> the
>>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>>>>> >>>> overdraft
>>>>> >>>>>>> isn't
>>>>> >>>>>>>>>> useful for
>>>>> >>>>>>>>>>>          the Aligned Checkpoint.
>>>>> >>>>>>>>>>>       4. For the legacy source
>>>>> >>>>>>>>>>>          - If enabling the Unaligned Checkpoint, it uses
>>>>> up to
>>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>>>>> >>>>>>>>>>>          - If disabling the UC, it doesn't use the
>>>>> overdraft
>>>>> >>>>> buffer.
>>>>> >>>>>>>>>>>          - Do you think it's ok?
>>>>> >>>>>>>>>>>          - Actually, we added the checkAvailable logic for
>>>>> >>>>>>> LegacySource
>>>>> >>>>>>>>>> in our
>>>>> >>>>>>>>>>>          internal version. It works well.
>>>>> >>>>>>>>>>>       5. For the benchmark, do you have any suggestions? I
>>>>> >>>>> submitted
>>>>> >>>>>>> the
>>>>> >>>>>>>>>> PR
>>>>> >>>>>>>>>>>       [1].
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> Thanks
>>>>> >>>>>>>>>>> fanrui
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
>>>>> >>>>>>> kaa....@yandex.com
>>>>> >>>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>> Hi,
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here
>>>>> is
>>>>> >>>>> some
>>>>> >>>>>>>>>>>> conclusion:
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> First of all, let's split this into two tasks.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per channel.
>>>>> >>>> This
>>>>> >>>>>>> means
>>>>> >>>>>>>> if
>>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the
>>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just
>>>>> >>>>> ignore
>>>>> >>>>>>> that
>>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool has
>>>>> >>>> it(it
>>>>> >>>>>> is
>>>>> >>>>>>>>>>>> actually not a overdraft).
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty
>>>>> >>>>>> convinced
>>>>> >>>>>>>> now
>>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for limitation
>>>>> of
>>>>> >>>>>>>> overdraft
>>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all
>>>>> >>>>> buffers
>>>>> >>>>>> of
>>>>> >>>>>>>> one
>>>>> >>>>>>>>>>>> TaskManager considering that several different jobs can be
>>>>> >>>>>>> submitted
>>>>> >>>>>>>> on
>>>>> >>>>>>>>>>>> this TaskManager). So idea is to have
>>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>>>>> >>>>>>>>>> LocalBufferPool).
>>>>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool
>>>>> is
>>>>> >>>>>>> reached,
>>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from
>>>>> >>>> NetworkBufferPool
>>>>> >>>>>> up
>>>>> >>>>>>> to
>>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource
>>>>> since it
>>>>> >>>>>>>> actually
>>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in
>>>>> >>>> overdraft
>>>>> >>>>>>> mode
>>>>> >>>>>>>>>>>> which is not a target. So we still need to think about
>>>>> that.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>>      29.04.2022 11:11, rui fan пишет:
>>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov,
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum
>>>>> number of
>>>>> >>>>>>>> overdraft
>>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, right?
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to
>>>>> don't
>>>>> >>>> add
>>>>> >>>>>> the
>>>>> >>>>>>>> new
>>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the
>>>>> community.
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> Best wishes
>>>>> >>>>>>>>>>>>> fanrui
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
>>>>> >>>>> 1996fan...@gmail.com>
>>>>> >>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov,
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are
>>>>> totally
>>>>> >>>>>> right.
>>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used
>>>>> as
>>>>> >>>>> the
>>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
>>>>> >>>>>>> configuration.Flink
>>>>> >>>>>>>>>> users
>>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the
>>>>> overdraft
>>>>> >>>>>> buffer
>>>>> >>>>>>>>>> size.
>>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we
>>>>> >>>> should
>>>>> >>>>>>> limit
>>>>> >>>>>>>>>> the
>>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each
>>>>> >>>>> LocalBufferPool
>>>>> >>>>>>>>>>>>>> can apply for.
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> Why do we limit it?
>>>>> >>>>>>>>>>>>>> Some operators don't check the
>>>>> `recordWriter.isAvailable`
>>>>> >>>>>> during
>>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have
>>>>> mentioned
>>>>> >>>> it
>>>>> >>>>>> in
>>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases.
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use
>>>>> up
>>>>> >>>> all
>>>>> >>>>>>>>>> remaining
>>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is
>>>>> >>>>>> severe.
>>>>> >>>>>>>>>>>>>> How to limit it?
>>>>> >>>>>>>>>>>>>> I prefer to hard code the
>>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
>>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
>>>>> >>>>> maxOverdraftBuffers
>>>>> >>>>>> is
>>>>> >>>>>>>>>> just
>>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink
>>>>> jobs. Or
>>>>> >>>>> we
>>>>> >>>>>>> can
>>>>> >>>>>>>>>> set
>>>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions,
>>>>> 10)`
>>>>> >>>> to
>>>>> >>>>>>> handle
>>>>> >>>>>>>>>>>>>> some jobs of low parallelism.
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we
>>>>> can
>>>>> >>>>> set
>>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
>>>>> >>>> LocalBufferPool.
>>>>> >>>>>>>> Because
>>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint.
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> Best wishes
>>>>> >>>>>>>>>>>>>> fanrui
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <
>>>>> >>>>>>>>>> kaa....@yandex.com>
>>>>> >>>>>>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Hi fanrui,
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it
>>>>> >>>> should
>>>>> >>>>>>> help
>>>>> >>>>>>>> in
>>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about
>>>>> >>>>>> configuration:
>>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand
>>>>> right
>>>>> >>>>> now
>>>>> >>>>>>> we
>>>>> >>>>>>>>>> have
>>>>> >>>>>>>>>>>>>>> following calculation.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
>>>>> >>>>> memory(calculated
>>>>> >>>>>>> via
>>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
>>>>> >>>>>>>> taskmanager.memory.network.min,
>>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) /
>>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive
>>>>> >>>> buffers
>>>>> >>>>> *
>>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in
>>>>> >>>>>> TaskManager
>>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used
>>>>> at
>>>>> >>>>>>> current
>>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to
>>>>> >>>>>>> maxBuffersNumber
>>>>> >>>>>>>>>> which
>>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if
>>>>> >>>> requiredBuffersNumber
>>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not
>>>>> good)
>>>>> >>>>> since
>>>>> >>>>>>> not
>>>>> >>>>>>>>>> all
>>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if
>>>>> Flink
>>>>> >>>> can
>>>>> >>>>>> not
>>>>> >>>>>>>>>>>>>>> allocate floating buffers)
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I
>>>>> >>>>>> understand
>>>>> >>>>>>>>>> Flink
>>>>> >>>>>>>>>>>>>>> just never use these leftovers
>>>>> buffers(maxBuffersNumber -
>>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we
>>>>> can
>>>>> >>>>>> actualy
>>>>> >>>>>>>> use
>>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
>>>>> >>>> buffersInUseNumber'
>>>>> >>>>>>> since
>>>>> >>>>>>>>>> if
>>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators including
>>>>> >>>>> 'window'
>>>>> >>>>>>>> which
>>>>> >>>>>>>>>> can
>>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool).
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to
>>>>> >>>> requesting
>>>>> >>>>>>>> buffers
>>>>> >>>>>>>>>>>>>>> during processing single record while switching to
>>>>> >>>>>> unavalability
>>>>> >>>>>>>>>>>> between
>>>>> >>>>>>>>>>>>>>> records should be the same as we have it now):
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel
>>>>> >>>>>> reached,
>>>>> >>>>>>>>>> then
>>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers
>>>>> from
>>>>> >>>>> any
>>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet
>>>>> >>>> otherwise
>>>>> >>>>>>> from
>>>>> >>>>>>>>>>>>>>> NetworkBufferPool)
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally
>>>>> >>>> allocate
>>>>> >>>>>> it
>>>>> >>>>>>>> from
>>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't work,
>>>>> >>>> but I
>>>>> >>>>>>> like
>>>>> >>>>>>>> it
>>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch without
>>>>> >>>> any
>>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be
>>>>> configuration
>>>>> >>>> by
>>>>> >>>>>>>>>> changing
>>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and
>>>>> requiredBuffersNumber.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really want
>>>>> to
>>>>> >>>>>>>> implement
>>>>> >>>>>>>>>> new
>>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to
>>>>> >>>>> correctly
>>>>> >>>>>>>>>> configure
>>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I don't
>>>>> >>>> want
>>>>> >>>>>> to
>>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to
>>>>> >>>> resolve
>>>>> >>>>>> the
>>>>> >>>>>>>>>> problem
>>>>> >>>>>>>>>>>>>>> automatically(as described above).
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers
>>>>> >>>> correct?
>>>>> >>>>>>>>>>>>>>> --
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Best regards,
>>>>> >>>>>>>>>>>>>>> Anton Kalashnikov
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
>>>>> >>>>>>>>>>>>>>>> Hi everyone,
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature
>>>>> of
>>>>> >>>>>> Flink.
>>>>> >>>>>>>> It
>>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout
>>>>> or
>>>>> >>>>> slow
>>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work
>>>>> well
>>>>> >>>>>> when
>>>>> >>>>>>>> the
>>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers
>>>>> are
>>>>> >>>>>>> required
>>>>> >>>>>>>> to
>>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also
>>>>> mentioned
>>>>> >>>>> this
>>>>> >>>>>>>> issue
>>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve
>>>>> it.
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the
>>>>> >>>>>>> overdraft
>>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
>>>>> >>>> Kalashnikov,
>>>>> >>>>>>> there
>>>>> >>>>>>>>>> are
>>>>> >>>>>>>>>>>>>>>> still some points to discuss:
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>       * There are already a lot of buffer-related
>>>>> >>>>>> configurations.
>>>>> >>>>>>>> Do
>>>>> >>>>>>>>>> we
>>>>> >>>>>>>>>>>>>>>>         need to add a new configuration for the
>>>>> overdraft
>>>>> >>>>>> buffer?
>>>>> >>>>>>>>>>>>>>>>       * Where should the overdraft buffer use memory?
>>>>> >>>>>>>>>>>>>>>>       * If the overdraft-buffer uses the memory
>>>>> remaining
>>>>> >>>> in
>>>>> >>>>>> the
>>>>> >>>>>>>>>>>>>>>>         NetworkBufferPool, no new configuration needs
>>>>> to be
>>>>> >>>>>>> added.
>>>>> >>>>>>>>>>>>>>>>       * If adding a new configuration:
>>>>> >>>>>>>>>>>>>>>>           o Should we set the overdraft-memory-size
>>>>> at the
>>>>> >>>> TM
>>>>> >>>>>>> level
>>>>> >>>>>>>>>> or
>>>>> >>>>>>>>>>>> the
>>>>> >>>>>>>>>>>>>>>>             Task level?
>>>>> >>>>>>>>>>>>>>>>           o Or set overdraft-buffers to indicate the
>>>>> number
>>>>> >>>>> of
>>>>> >>>>>>>>>>>>>>>>             memory-segments that can be overdrawn.
>>>>> >>>>>>>>>>>>>>>>           o What is the default value? How to set
>>>>> sensible
>>>>> >>>>>>>> defaults?
>>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it
>>>>> using
>>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers
>>>>> at
>>>>> >>>>> Task
>>>>> >>>>>>>> level,
>>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool
>>>>> >>>> can
>>>>> >>>>>>>>>> overdraw up
>>>>> >>>>>>>>>>>>>>>> to 10 memory-segments.
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback!
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> Thanks,
>>>>> >>>>>>>>>>>>>>>> fanrui
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> [1]
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>>>>> >>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396
>>>>> >>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762
>>>>> >>>>>>>>>>>>>>>> [4]
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>>>>> >>>>>>>>>>>>>>>> [5]
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>
>>>>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
>>>>> >>>>>>>>>>>>>>>> [6]
>>>>> https://github.com/apache/flink-benchmarks/pull/54
>>>>> >>>>>>>>>>>> --
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Best regards,
>>>>> >>>>>>>>>>>> Anton Kalashnikov
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>> --
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Best regards,
>>>>> >>>>>>>>>> Anton Kalashnikov
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >> --
>>>>> >>
>>>>> >> Best regards,
>>>>> >> Anton Kalashnikov
>>>>> >>
>>>>> >>
>>>>>
>>>>

Reply via email to