Hi,

I'm not sure. Maybe 5 will be fine? Anton, Dawid, what do you think?

Can you create a parent ticket for the whole FLIP to group all of the
issues together?

Also FLIP should be officially voted first.

Best,
Piotrek

pt., 6 maj 2022 o 09:08 rui fan <1996fan...@gmail.com> napisał(a):

> Hi Anton, Piotrek and Dawid,
>
> Thanks for your help.
>
> I created FLINK-27522[1] as the first task. And I will finish it asap.
>
> @Piotrek, for the default value, do you think it should be less
> than 5? What do you think about 3? Actually, I think 5 isn't big.
> It's 1 or 3 or 5 that doesn't matter much, the focus is on
> reasonably resolving deadlock problems. Or I push the second
> task to move forward first and we discuss the default value in PR.
>
> For the legacySource, I got your idea. And I propose we create
> the third task to handle it. Because it is independent and for
> compatibility with the old API. What do you think? I updated
> the third task on FLIP-227[2].
>
> If all is ok, I will create a JIRA for the third Task and add it to
> FLIP-227. And I will develop them from the first task to the
> third task.
>
> Thanks again for your help.
>
> [1] https://issues.apache.org/jira/browse/FLINK-27522
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>
> Thanks
> fanrui
>
> On Fri, May 6, 2022 at 3:50 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi fanrui,
> >
> > > How to identify legacySource?
> >
> > legacy sources are always using the SourceStreamTask class and
> > SourceStreamTask is used only for legacy sources. But I'm not sure how to
> > enable/disable that. Adding `disableOverdraft()` call in SourceStreamTask
> > would be better compared to relying on the `getAvailableFuture()` call
> > (isn't it used for back pressure metric anyway?). Ideally we should
> > enable/disable it in the constructors, but that might be tricky.
> >
> > > I prefer it to be between 5 and 10
> >
> > I would vote for a smaller value because of FLINK-13203
> >
> > Piotrek
> >
> >
> >
> > czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a):
> >
> >> Hi,
> >>
> >> Thanks a lot for your discussion.
> >>
> >> After several discussions, I think it's clear now. I updated the
> >> "Proposed Changes" of FLIP-227[1]. If I have something
> >> missing, please help to add it to FLIP, or add it in the mail
> >> and I can add it to FLIP. If everything is OK, I will create a
> >> new JIRA for the first task, and use FLINK-26762[2] as the
> >> second task.
> >>
> >> About the legacy source, do we set maxOverdraftBuffersPerGate=0
> >> directly? How to identify legacySource? Or could we add
> >> the overdraftEnabled in LocalBufferPool? The default value
> >> is false. If the getAvailableFuture is called, change
> >> overdraftEnabled=true.
> >> It indicates whether there are checks isAvailable elsewhere.
> >> It might be more general, it can cover more cases.
> >>
> >> Also, I think the default value of 'max-overdraft-buffers-per-gate'
> >> needs to be confirmed. I prefer it to be between 5 and 10. How
> >> do you think?
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
> >> [2] https://issues.apache.org/jira/browse/FLINK-26762
> >>
> >> Thanks
> >> fanrui
> >>
> >> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org>
> >> wrote:
> >>
> >>> Hi again,
> >>>
> >>> After sleeping over this, if both versions (reserve and overdraft) have
> >>> the same complexity, I would also prefer the overdraft.
> >>>
> >>> > `Integer.MAX_VALUE` as default value was my idea as well but now, as
> >>> > Dawid mentioned, I think it is dangerous since it is too implicit for
> >>> > the user and if the user submits one more job for the same TaskManger
> >>>
> >>> As I mentioned, it's not only an issue with multiple jobs. The same
> >>> problem can happen with different subtasks from the same job,
> potentially
> >>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I
> would be
> >>> in favour of Integer.MAX_VALUE to be the default value, but as it is, I
> >>> think we should indeed play on the safe side and limit it.
> >>>
> >>> > I still don't understand how should be limited "reserve"
> >>> implementation.
> >>> > I mean if we have X buffers in total and the user sets overdraft
> equal
> >>> > to X we obviously can not reserve all buffers, but how many we are
> >>> > allowed to reserve? Should it be a different configuration like
> >>> > percentegeForReservedBuffers?
> >>>
> >>> The reserve could be defined as percentage, or as a fixed number of
> >>> buffers. But yes. In normal operation subtask would not use the
> reserve, as
> >>> if numberOfAvailableBuffers < reserve, the output would be not
> available.
> >>> Only in the flatMap/timers/huge records case the reserve could be used.
> >>>
> >>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers,
> >>> will LocalBufferPool never be available? Can't process data?
> >>>
> >>> Of course we would need to make sure that never happens. So the reserve
> >>> should be < total buffer size.
> >>>
> >>> > 2. If the overdraft buffer use the extra buffers, when the downstream
> >>> > task inputBuffer is insufficient, it should fail to start the job,
> and
> >>> then
> >>> > restart? When the InputBuffer is initialized, it will apply for
> enough
> >>> > buffers, right?
> >>>
> >>> The failover if downstream can not allocate buffers is already
> >>> implemented FLINK-14872 [2]. There is a timeout for how long the task
> is
> >>> waiting for buffer allocation. However this doesn't prevent many
> >>> (potentially infinitely many) deadlock/restarts cycles. IMO the propper
> >>> solution for [1] would be 2b described in the ticket:
> >>>
> >>> > 2b. Assign extra buffers only once all of the tasks are RUNNING. This
> >>> is a simplified version of 2a, without tracking the tasks
> sink-to-source.
> >>>
> >>> But that's a pre-existing problem and I don't think we have to solve it
> >>> before implementing overdraft. I think we would need to solve it only
> >>> before setting Integer.MAX_VALUE as the default for the overdraft.
> Maybe I
> >>> would hesitate setting the overdraft to anything more then a couple of
> >>> buffers by default for the same reason.
> >>>
> >>> > Actually, I totally agree that we don't need a lot of buffers for
> >>> overdraft
> >>>
> >>> and
> >>>
> >>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
> >>> > When we finish this feature and after users use it, if users feedback
> >>> > this issue we can discuss again.
> >>>
> >>> +1
> >>>
> >>> Piotrek
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-13203
> >>> [2] https://issues.apache.org/jira/browse/FLINK-14872
> >>>
> >>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):
> >>>
> >>>> Hi everyone,
> >>>>
> >>>> I still have some questions.
> >>>>
> >>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers,
> will
> >>>> LocalBufferPool never be available? Can't process data?
> >>>> 2. If the overdraft buffer use the extra buffers, when the downstream
> >>>> task inputBuffer is insufficient, it should fail to start the job, and
> >>>> then
> >>>> restart? When the InputBuffer is initialized, it will apply for enough
> >>>> buffers, right?
> >>>>
> >>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
> >>>> When we finish this feature and after users use it, if users feedback
> >>>> this issue we can discuss again.
> >>>>
> >>>> Thanks
> >>>> fanrui
> >>>>
> >>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <
> dwysakow...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Hey all,
> >>>>>
> >>>>> I have not replied in the thread yet, but I was following the
> >>>>> discussion.
> >>>>>
> >>>>> Personally, I like Fanrui's and Anton's idea. As far as I understand
> >>>>> it
> >>>>> the idea to distinguish between inside flatMap & outside would be
> >>>>> fairly
> >>>>> simple, but maybe slightly indirect. The checkAvailability would
> >>>>> remain
> >>>>> unchanged and it is checked always between separate invocations of
> the
> >>>>> UDF. Therefore the overdraft buffers would not apply there. However
> >>>>> once
> >>>>> the pool says it is available, it means it has at least an initial
> >>>>> buffer. So any additional request without checking for availability
> >>>>> can
> >>>>> be considered to be inside of processing a single record. This does
> >>>>> not
> >>>>> hold just for the LegacySource as I don't think it actually checks
> for
> >>>>> the availability of buffers in the LocalBufferPool.
> >>>>>
> >>>>> In the offline chat with Anton, we also discussed if we need a limit
> >>>>> of
> >>>>> the number of buffers we could overdraft (or in other words if the
> >>>>> limit
> >>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to
> >>>>> stay
> >>>>> on the safe side and have it limited. The pool of network buffers is
> >>>>> shared for the entire TaskManager, so it means it can be shared even
> >>>>> across tasks of separate jobs. However, I might be just unnecessarily
> >>>>> cautious here.
> >>>>>
> >>>>> Best,
> >>>>>
> >>>>> Dawid
> >>>>>
> >>>>> On 04/05/2022 10:54, Piotr Nowojski wrote:
> >>>>> > Hi,
> >>>>> >
> >>>>> > Thanks for the answers.
> >>>>> >
> >>>>> >> we may still need to discuss whether the
> >>>>> >> overdraft/reserve/spare should use extra buffers or buffers
> >>>>> >> in (exclusive + floating buffers)?
> >>>>> > and
> >>>>> >
> >>>>> >> These things resolve the different problems (at least as I see
> >>>>> that).
> >>>>> >> The current hardcoded "1"  says that we switch "availability" to
> >>>>> >> "unavailability" when one more buffer is left(actually a little
> less
> >>>>> >> than one buffer since we write the last piece of data to this last
> >>>>> >> buffer). The overdraft feature doesn't change this logic we still
> >>>>> want
> >>>>> >> to switch to "unavailability" in such a way but if we are already
> in
> >>>>> >> "unavailability" and we want more buffers then we can take
> >>>>> "overdraft
> >>>>> >> number" more. So we can not avoid this hardcoded "1" since we need
> >>>>> to
> >>>>> >> understand when we should switch to "unavailability"
> >>>>> > Ok, I see. So it seems to me that both of you have in mind to keep
> >>>>> the
> >>>>> > buffer pools as they are right now, but if we are in the middle of
> >>>>> > processing a record, we can request extra overdraft buffers on top
> of
> >>>>> > those? This is another way to implement the overdraft to what I was
> >>>>> > thinking. I was thinking about something like keeping the
> >>>>> "overdraft" or
> >>>>> > more precisely buffer "reserve" in the buffer pool. I think my
> >>>>> version
> >>>>> > would be easier to implement, because it is just fiddling with
> >>>>> min/max
> >>>>> > buffers calculation and slightly modified `checkAvailability()`
> >>>>> logic.
> >>>>> >
> >>>>> > On the other hand  what you have in mind would better utilise the
> >>>>> available
> >>>>> > memory, right? It would require more code changes (how would we
> know
> >>>>> when
> >>>>> > we are allowed to request the overdraft?). However, in this case, I
> >>>>> would
> >>>>> > be tempted to set the number of overdraft buffers by default to
> >>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers as
> >>>>> > necessary. The only downside that I can think of (apart of higher
> >>>>> > complexity) would be higher chance of hitting a known/unsolved
> >>>>> deadlock [1]
> >>>>> > in a scenario:
> >>>>> > - downstream task hasn't yet started
> >>>>> > - upstream task requests overdraft and uses all available memory
> >>>>> segments
> >>>>> > from the global pool
> >>>>> > - upstream task is blocked, because downstream task hasn't started
> >>>>> yet and
> >>>>> > can not consume any data
> >>>>> > - downstream task tries to start, but can not, as there are no
> >>>>> available
> >>>>> > buffers
> >>>>> >
> >>>>> >> BTW, for watermark, the number of buffers it needs is
> >>>>> >> numberOfSubpartitions. So if
> overdraftBuffers=numberOfSubpartitions,
> >>>>> >> the watermark won't block in requestMemory.
> >>>>> > and
> >>>>> >
> >>>>> >> the best overdraft size will be equal to parallelism.
> >>>>> > That's a lot of buffers. I don't think we need that many for
> >>>>> broadcasting
> >>>>> > watermarks. Watermarks are small, and remember that every
> >>>>> subpartition has
> >>>>> > some partially filled/empty WIP buffer, so the vast majority of
> >>>>> > subpartitions will not need to request a new buffer.
> >>>>> >
> >>>>> > Best,
> >>>>> > Piotrek
> >>>>> >
> >>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203
> >>>>> >
> >>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com>
> >>>>> napisał(a):
> >>>>> >
> >>>>> >> Hi,
> >>>>> >>
> >>>>> >>
> >>>>> >>   >> Do you mean to ignore it while processing records, but keep
> >>>>> using
> >>>>> >> `maxBuffersPerChannel` when calculating the availability of the
> >>>>> output?
> >>>>> >>
> >>>>> >>
> >>>>> >> Yes, it is correct.
> >>>>> >>
> >>>>> >>
> >>>>> >>   >> Would it be a big issue if we changed it to check if at least
> >>>>> >> "overdraft number of buffers are available", where "overdraft
> >>>>> number" is
> >>>>> >> configurable, instead of the currently hardcoded value of "1"?
> >>>>> >>
> >>>>> >>
> >>>>> >> These things resolve the different problems (at least as I see
> >>>>> that).
> >>>>> >> The current hardcoded "1"  says that we switch "availability" to
> >>>>> >> "unavailability" when one more buffer is left(actually a little
> less
> >>>>> >> than one buffer since we write the last piece of data to this last
> >>>>> >> buffer). The overdraft feature doesn't change this logic we still
> >>>>> want
> >>>>> >> to switch to "unavailability" in such a way but if we are already
> in
> >>>>> >> "unavailability" and we want more buffers then we can take
> >>>>> "overdraft
> >>>>> >> number" more. So we can not avoid this hardcoded "1" since we need
> >>>>> to
> >>>>> >> understand when we should switch to "unavailability"
> >>>>> >>
> >>>>> >>
> >>>>> >> -- About "reserve" vs "overdraft"
> >>>>> >>
> >>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will
> be
> >>>>> >> equal to parallelism. Also, the user can set any value he wants.
> So
> >>>>> even
> >>>>> >> if parallelism is small(~5) but the user's flatmap produces a lot
> of
> >>>>> >> data, the user can set 10 or even more. Which almost double the
> max
> >>>>> >> buffers and it will be impossible to reserve. At least we need to
> >>>>> figure
> >>>>> >> out how to protect from such cases (the limit for an overdraft?).
> So
> >>>>> >> actually it looks even more difficult than increasing the maximum
> >>>>> buffers.
> >>>>> >>
> >>>>> >> I want to emphasize that overdraft buffers are soft configuration
> >>>>> which
> >>>>> >> means it takes as many buffers as the global buffers pool has
> >>>>> >> available(maybe zero) but less than this configured value. It is
> >>>>> also
> >>>>> >> important to notice that perhaps, not many subtasks in TaskManager
> >>>>> will
> >>>>> >> be using this feature so we don't actually need a lot of available
> >>>>> >> buffers for every subtask(Here, I mean that if we have only one
> >>>>> >> window/flatmap operator and many other operators, then one
> >>>>> TaskManager
> >>>>> >> will have many ordinary subtasks which don't actually need
> >>>>> overdraft and
> >>>>> >> several subtasks that needs this feature). But in case of
> >>>>> reservation,
> >>>>> >> we will reserve some buffers for all operators even if they don't
> >>>>> really
> >>>>> >> need it.
> >>>>> >>
> >>>>> >>
> >>>>> >> -- Legacy source problem
> >>>>> >>
> >>>>> >> If we still want to change max buffers then it is problem for
> >>>>> >> LegacySources(since every subtask of source will always use these
> >>>>> >> overdraft). But right now, I think that we can force to set 0
> >>>>> overdraft
> >>>>> >> buffers for legacy subtasks in configuration during execution(if
> it
> >>>>> is
> >>>>> >> not too late for changing configuration in this place).
> >>>>> >>
> >>>>> >>
> >>>>> >> 03.05.2022 14:11, rui fan пишет:
> >>>>> >>> Hi
> >>>>> >>>
> >>>>> >>> Thanks for Martijn Visser and Piotrek's feedback.  I agree with
> >>>>> >>> ignoring the legacy source, it will affect our design. User
> should
> >>>>> >>> use the new Source Api as much as possible.
> >>>>> >>>
> >>>>> >>> Hi Piotrek, we may still need to discuss whether the
> >>>>> >>> overdraft/reserve/spare should use extra buffers or buffers
> >>>>> >>> in (exclusive + floating buffers)? They have some differences.
> >>>>> >>>
> >>>>> >>> If it uses extra buffers:
> >>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1
> >>>>> >>>    <= currentPoolSize) and all subpartitions don't reach the
> >>>>> >>> maxBuffersPerChannel.
> >>>>> >>>
> >>>>> >>> If it uses the buffers in (exclusive + floating buffers):
> >>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers +
> >>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions
> >>>>> >>> don't reach the maxBuffersPerChannel.
> >>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8),
> the
> >>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be
> >>>>> >>> easily unavailable. For throughput, if users turn up the
> >>>>> >>> overdraft buffers, they need to turn up exclusive or floating
> >>>>> >>> buffers. It also affects the InputChannel, and it's is unfriendly
> >>>>> >>> to users.
> >>>>> >>>
> >>>>> >>> So I prefer the overdraft to use extra buffers.
> >>>>> >>>
> >>>>> >>>
> >>>>> >>> BTW, for watermark, the number of buffers it needs is
> >>>>> >>> numberOfSubpartitions. So if
> >>>>> overdraftBuffers=numberOfSubpartitions,
> >>>>> >>> the watermark won't block in requestMemory. But it has
> >>>>> >>> 2 problems:
> >>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses
> >>>>> >>> (exclusive + floating buffers),  there will be fewer buffers
> >>>>> >>> available. Throughput may be affected.
> >>>>> >>> 2. The numberOfSubpartitions is different for each Task.
> >>>>> >>> So if users want to cover watermark using this feature,
> >>>>> >>> they don't know how to set the overdraftBuffers more r
> >>>>> >>> easonably. And if the parallelism is changed, users still
> >>>>> >>> need to change overdraftBuffers. It is unfriendly to users.
> >>>>> >>>
> >>>>> >>> So I propose we support overdraftBuffers=-1, It means
> >>>>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions
> >>>>> >>> in the Constructor of LocalBufferPool.
> >>>>> >>>
> >>>>> >>> Please correct me if I'm wrong.
> >>>>> >>>
> >>>>> >>> Thanks
> >>>>> >>> fanrui
> >>>>> >>>
> >>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <
> >>>>> pnowoj...@apache.org>
> >>>>> >> wrote:
> >>>>> >>>> Hi fanrui,
> >>>>> >>>>
> >>>>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive
> >>>>> >> buffers *
> >>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be
> >>>>> available
> >>>>> >>>> when
> >>>>> >>>>> (usedBuffers+overdraftBuffers <=
> >>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers)
> >>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel,
> >>>>> right?
> >>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum
> >>>>> number of
> >>>>> >> the
> >>>>> >>>> required buffers, just as we did when we were implementing the
> non
> >>>>> >> blocking
> >>>>> >>>> outputs and adding availability logic to LocalBufferPool. Back
> >>>>> then we
> >>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this
> >>>>> logic is
> >>>>> >>>> located
> >>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
> >>>>> >>>>
> >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
> >>>>> numSubpartitions + 1;
> >>>>> >>>> For performance reasons, we always require at least one buffer
> per
> >>>>> >>>> sub-partition. Otherwise performance falls drastically. Now if
> we
> >>>>> >> require 5
> >>>>> >>>> overdraft buffers for output to be available, we need to have
> >>>>> them on
> >>>>> >> top
> >>>>> >>>> of those "one buffer per sub-partition". So the logic should be
> >>>>> changed
> >>>>> >> to:
> >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
> >>>>> numSubpartitions +
> >>>>> >>>> numOverdraftBuffers;
> >>>>> >>>>
> >>>>> >>>> Regarding increasing the number of max buffers I'm not sure. As
> >>>>> long as
> >>>>> >>>> "overdraft << max number of buffers", because all buffers on the
> >>>>> outputs
> >>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft
> >>>>> buffers,
> >>>>> >> and
> >>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of
> >>>>> things if
> >>>>> >> we
> >>>>> >>>> make the output available if at least one single buffer is
> >>>>> available or
> >>>>> >> at
> >>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So
> >>>>> effects of
> >>>>> >>>> increasing the overdraft from 1 to for example 5 should be
> >>>>> negligible.
> >>>>> >> For
> >>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5
> still
> >>>>> >> increases
> >>>>> >>>> the overdraft by only about 25%. So maybe we can keep the max as
> >>>>> it is?
> >>>>> >>>>
> >>>>> >>>> If so, maybe we should change the name from "overdraft" to
> "buffer
> >>>>> >> reserve"
> >>>>> >>>> or "spare buffers"? And document it as "number of buffers kept
> in
> >>>>> >> reserve
> >>>>> >>>> in case of flatMap/firing timers/huge records"?
> >>>>> >>>>
> >>>>> >>>> What do you think Fenrui, Anton?
> >>>>> >>>>
> >>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the new
> >>>>> >> features,
> >>>>> >>>> as long as we don't brake the existing deployments too much.
> >>>>> >>>>
> >>>>> >>>> Best,
> >>>>> >>>> Piotrek
> >>>>> >>>>
> >>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com>
> >>>>> >> napisał(a):
> >>>>> >>>>> Hi everyone,
> >>>>> >>>>>
> >>>>> >>>>> Just wanted to chip in on the discussion of legacy sources:
> >>>>> IMHO, we
> >>>>> >>>> should
> >>>>> >>>>> not focus too much on improving/adding capabilities for legacy
> >>>>> sources.
> >>>>> >>>> We
> >>>>> >>>>> want to persuade and push users to use the new Source API. Yes,
> >>>>> this
> >>>>> >>>> means
> >>>>> >>>>> that there's work required by the end users to port any custom
> >>>>> source
> >>>>> >> to
> >>>>> >>>>> the new interface. The benefits of the new Source API should
> >>>>> outweigh
> >>>>> >>>> this.
> >>>>> >>>>> Anything that we build to support multiple interfaces means
> >>>>> adding more
> >>>>> >>>>> complexity and more possibilities for bugs. Let's try to make
> our
> >>>>> >> lives a
> >>>>> >>>>> little bit easier.
> >>>>> >>>>>
> >>>>> >>>>> Best regards,
> >>>>> >>>>>
> >>>>> >>>>> Martijn Visser
> >>>>> >>>>> https://twitter.com/MartijnVisser82
> >>>>> >>>>> https://github.com/MartijnVisser
> >>>>> >>>>>
> >>>>> >>>>>
> >>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com>
> >>>>> wrote:
> >>>>> >>>>>
> >>>>> >>>>>> Hi Piotrek
> >>>>> >>>>>>
> >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep
> >>>>> using
> >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of
> the
> >>>>> >>>> output?
> >>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double
> check.
> >>>>> >>>>>>
> >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a
> >>>>> big
> >>>>> >>>>> problem
> >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we
> already
> >>>>> have
> >>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
> >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a
> single
> >>>>> >>>> buffer
> >>>>> >>>>>>> available and this works the same for all tasks (including
> >>>>> legacy
> >>>>> >>>>> source
> >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if
> >>>>> at least
> >>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft
> >>>>> number"
> >>>>> >>>>> is
> >>>>> >>>>>>> configurable, instead of the currently hardcoded value of
> "1"?
> >>>>> >>>>>> Do you mean don't add the extra buffers? We just use
> (exclusive
> >>>>> >>>> buffers *
> >>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be
> >>>>> available
> >>>>> >>>>> when
> >>>>> >>>>>> (usedBuffers+overdraftBuffers <=
> >>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers)
> >>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel,
> >>>>> right?
> >>>>> >>>>>>
> >>>>> >>>>>> If yes, I think it can solve the problem of legacy source.
> >>>>> There may
> >>>>> >> be
> >>>>> >>>>>> some impact. If overdraftBuffers is large and only one buffer
> >>>>> is used
> >>>>> >>>> to
> >>>>> >>>>>> process a single record, exclusive buffers*parallelism +
> >>>>> floating
> >>>>> >>>> buffers
> >>>>> >>>>>> cannot be used. It may only be possible to use (exclusive
> >>>>> buffers *
> >>>>> >>>>>> parallelism
> >>>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput,
> if
> >>>>> turn
> >>>>> >> up
> >>>>> >>>>> the
> >>>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive
> or
> >>>>> >>>> floating
> >>>>> >>>>>> buffers. And it also affects the InputChannel.
> >>>>> >>>>>>
> >>>>> >>>>>> If not, I don't think it can solve the problem of legacy
> >>>>> source. The
> >>>>> >>>>> legacy
> >>>>> >>>>>> source don't check isAvailable, If there are the extra
> buffers,
> >>>>> legacy
> >>>>> >>>>>> source
> >>>>> >>>>>> will use them up until block in requestMemory.
> >>>>> >>>>>>
> >>>>> >>>>>>
> >>>>> >>>>>> Thanks
> >>>>> >>>>>> fanrui
> >>>>> >>>>>>
> >>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <
> >>>>> pnowoj...@apache.org>
> >>>>> >>>>>> wrote:
> >>>>> >>>>>>
> >>>>> >>>>>>> Hi,
> >>>>> >>>>>>>
> >>>>> >>>>>>> +1 for the general proposal from my side. It would be a nice
> >>>>> >>>> workaround
> >>>>> >>>>>>> flatMaps, WindowOperators and large records issues with
> >>>>> unaligned
> >>>>> >>>>>>> checkpoints.
> >>>>> >>>>>>>
> >>>>> >>>>>>>> The first task is about ignoring max buffers per channel.
> This
> >>>>> >>>> means
> >>>>> >>>>> if
> >>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the
> >>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just
> >>>>> ignore
> >>>>> >>>> that
> >>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has
> >>>>> it(it is
> >>>>> >>>>>>>> actually not a overdraft).
> >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep
> >>>>> using
> >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of
> the
> >>>>> >>>> output?
> >>>>> >>>>>>>> The second task is about the real overdraft. I am pretty
> >>>>> convinced
> >>>>> >>>>> now
> >>>>> >>>>>>>> that we, unfortunately, need configuration for limitation of
> >>>>> >>>>> overdraft
> >>>>> >>>>>>>> number(because it is not ok if one subtask allocates all
> >>>>> buffers of
> >>>>> >>>>> one
> >>>>> >>>>>>>> TaskManager considering that several different jobs can be
> >>>>> >>>> submitted
> >>>>> >>>>> on
> >>>>> >>>>>>>> this TaskManager). So idea is to have
> >>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
> >>>>> >>>>>> LocalBufferPool).
> >>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is
> >>>>> >>>> reached,
> >>>>> >>>>>>>> LocalBufferPool can request additionally from
> >>>>> NetworkBufferPool up
> >>>>> >>>> to
> >>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers.
> >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a
> >>>>> big
> >>>>> >>>>> problem
> >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we
> already
> >>>>> have
> >>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
> >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a
> single
> >>>>> >>>> buffer
> >>>>> >>>>>>> available and this works the same for all tasks (including
> >>>>> legacy
> >>>>> >>>>> source
> >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if
> >>>>> at least
> >>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft
> >>>>> number"
> >>>>> >>>>> is
> >>>>> >>>>>>> configurable, instead of the currently hardcoded value of
> "1"?
> >>>>> >>>>>>>
> >>>>> >>>>>>> Best,
> >>>>> >>>>>>> Piotrek
> >>>>> >>>>>>>
> >>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com>
> >>>>> napisał(a):
> >>>>> >>>>>>>
> >>>>> >>>>>>>> Let me add some information about the LegacySource.
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource.
> >>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
> >>>>> >>>>>>>> The default value is false. If the getAvailableFuture is
> >>>>> called,
> >>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there are
> >>>>> >>>>>>>> checks isAvailable elsewhere.
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct
> me
> >>>>> if
> >>>>> >>>> I'm
> >>>>> >>>>>>> wrong.
> >>>>> >>>>>>>> Thanks
> >>>>> >>>>>>>> fanrui
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <
> >>>>> 1996fan...@gmail.com>
> >>>>> >>>>> wrote:
> >>>>> >>>>>>>>> Hi,
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> Thanks for your quick response.
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need
> to
> >>>>> >>>>> discuss
> >>>>> >>>>>>> the
> >>>>> >>>>>>>>> default value in PR.
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for
> >>>>> general
> >>>>> >>>>>>>>> implementation.
> >>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in
> >>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common
> >>>>> >>>>> LegacySource,
> >>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume
> >>>>> >>>> kafka,
> >>>>> >>>>> so
> >>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems.
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> Core code:
> >>>>> >>>>>>>>> ```
> >>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() {
> >>>>> >>>>>>>>>        if (recordWriter == null
> >>>>> >>>>>>>>>             ||
> >>>>> >>>>>>>>>
> >>>>> >>
> >>>>>
> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
> >>>>> >>>>>>>>> false)
> >>>>> >>>>>>>>>             || recordWriter.isAvailable()) {
> >>>>> >>>>>>>>>             return;
> >>>>> >>>>>>>>>        }
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>>        CompletableFuture<?> resumeFuture =
> >>>>> >>>>>>>> recordWriter.getAvailableFuture();
> >>>>> >>>>>>>>>        try {
> >>>>> >>>>>>>>>             resumeFuture.get();
> >>>>> >>>>>>>>>        } catch (Throwable ignored) {
> >>>>> >>>>>>>>>        }
> >>>>> >>>>>>>>> }
> >>>>> >>>>>>>>> ```
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> LegacySource calls
> >>>>> sourceContext.ensureRecordWriterIsAvailable()
> >>>>> >>>>>>>>> before synchronized (checkpointLock) and collects records.
> >>>>> >>>>>>>>> Please let me know if there is a better solution.
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> Thanks
> >>>>> >>>>>>>>> fanrui
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
> >>>>> >>>>>> kaa....@yandex.com>
> >>>>> >>>>>>>>> wrote:
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>>> Hi.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or
> >>>>> two
> >>>>> >>>>>> commits
> >>>>> >>>>>>>> in a
> >>>>> >>>>>>>>>>       PR?
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this
> >>>>> task has
> >>>>> >>>>>> fewer
> >>>>> >>>>>>>>>> questions but we should find a solution for LegacySource
> >>>>> first.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> --  2. For the first task, if the flink user disables the
> >>>>> >>>>> Unaligned
> >>>>> >>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
> >>>>> Because
> >>>>> >>>> the
> >>>>> >>>>>>>>>> overdraft
> >>>>> >>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
> >>>>> needs to
> >>>>> >>>>> wait
> >>>>> >>>>>>> for
> >>>>> >>>>>>>>>>       downstream Task to consume.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> I think that the logic should be the same for AC and UC.
> As
> >>>>> I
> >>>>> >>>>>>>> understand,
> >>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it
> >>>>> doesn't
> >>>>> >>>>> make
> >>>>> >>>>>>> it
> >>>>> >>>>>>>>>> worse as well.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>>     3. For the second task
> >>>>> >>>>>>>>>> --      - The default value of
> >>>>> maxOverdraftBuffersPerPartition
> >>>>> >>>> may
> >>>>> >>>>>>> also
> >>>>> >>>>>>>>>> need
> >>>>> >>>>>>>>>>          to be discussed.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> I think it should be a pretty small value or even 0 since
> it
> >>>>> >>>> kind
> >>>>> >>>>> of
> >>>>> >>>>>>>>>> optimization and user should understand what they
> >>>>> do(especially
> >>>>> >>>> if
> >>>>> >>>>>> we
> >>>>> >>>>>>>>>> implement the first task).
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> --      - If the user disables the Unaligned Checkpoint,
> >>>>> can we
> >>>>> >>>>> set
> >>>>> >>>>>>> the
> >>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
> >>>>> overdraft
> >>>>> >>>>>> isn't
> >>>>> >>>>>>>>>> useful for
> >>>>> >>>>>>>>>>          the Aligned Checkpoint.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make
> >>>>> >>>>>> degradation
> >>>>> >>>>>>>> for
> >>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make
> >>>>> >>>>> difference
> >>>>> >>>>>>>> between
> >>>>> >>>>>>>>>> AC and UC.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>>       4. For the legacy source
> >>>>> >>>>>>>>>> --      - If enabling the Unaligned Checkpoint, it uses up
> >>>>> to
> >>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
> >>>>> >>>>>>>>>>          - If disabling the UC, it doesn't use the
> overdraft
> >>>>> >>>> buffer.
> >>>>> >>>>>>>>>>          - Do you think it's ok?
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at
> >>>>> all
> >>>>> >>>>> since
> >>>>> >>>>>>> it
> >>>>> >>>>>>>>>> can lead to undesirable results especially if the limit is
> >>>>> high.
> >>>>> >>>>> At
> >>>>> >>>>>>>> least,
> >>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode and
> >>>>> it
> >>>>> >>>> will
> >>>>> >>>>>>>> borrow
> >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global
> pool
> >>>>> >>>> which
> >>>>> >>>>>> can
> >>>>> >>>>>>>> lead
> >>>>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> --      - Actually, we added the checkAvailable logic for
> >>>>> >>>>>> LegacySource
> >>>>> >>>>>>>> in
> >>>>> >>>>>>>>>> our
> >>>>> >>>>>>>>>>          internal version. It works well.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> I don't really understand how it is possible for general
> >>>>> case
> >>>>> >>>>>>>> considering
> >>>>> >>>>>>>>>> that each user has their own implementation of
> >>>>> >>>>> LegacySourceOperator
> >>>>> >>>>>>>>>> --   5. For the benchmark, do you have any suggestions? I
> >>>>> >>>>> submitted
> >>>>> >>>>>>> the
> >>>>> >>>>>>>> PR
> >>>>> >>>>>>>>>>       [1].
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
> >>>>> >>>>>>>>>>> Hi,
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions.
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>>       1. Do you mean split this into two JIRAs or two PRs
> >>>>> or two
> >>>>> >>>>>>> commits
> >>>>> >>>>>>>>>> in a
> >>>>> >>>>>>>>>>>       PR?
> >>>>> >>>>>>>>>>>       2. For the first task, if the flink user disables
> the
> >>>>> >>>>>> Unaligned
> >>>>> >>>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
> >>>>> Because
> >>>>> >>>>> the
> >>>>> >>>>>>>>>> overdraft
> >>>>> >>>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
> >>>>> needs to
> >>>>> >>>>>> wait
> >>>>> >>>>>>>> for
> >>>>> >>>>>>>>>>>       downstream Task to consume.
> >>>>> >>>>>>>>>>>       3. For the second task
> >>>>> >>>>>>>>>>>          - The default value of
> >>>>> maxOverdraftBuffersPerPartition
> >>>>> >>>>> may
> >>>>> >>>>>>> also
> >>>>> >>>>>>>>>> need
> >>>>> >>>>>>>>>>>          to be discussed.
> >>>>> >>>>>>>>>>>          - If the user disables the Unaligned Checkpoint,
> >>>>> can we
> >>>>> >>>>> set
> >>>>> >>>>>>> the
> >>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
> >>>>> >>>> overdraft
> >>>>> >>>>>>> isn't
> >>>>> >>>>>>>>>> useful for
> >>>>> >>>>>>>>>>>          the Aligned Checkpoint.
> >>>>> >>>>>>>>>>>       4. For the legacy source
> >>>>> >>>>>>>>>>>          - If enabling the Unaligned Checkpoint, it uses
> >>>>> up to
> >>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
> >>>>> >>>>>>>>>>>          - If disabling the UC, it doesn't use the
> >>>>> overdraft
> >>>>> >>>>> buffer.
> >>>>> >>>>>>>>>>>          - Do you think it's ok?
> >>>>> >>>>>>>>>>>          - Actually, we added the checkAvailable logic
> for
> >>>>> >>>>>>> LegacySource
> >>>>> >>>>>>>>>> in our
> >>>>> >>>>>>>>>>>          internal version. It works well.
> >>>>> >>>>>>>>>>>       5. For the benchmark, do you have any suggestions?
> I
> >>>>> >>>>> submitted
> >>>>> >>>>>>> the
> >>>>> >>>>>>>>>> PR
> >>>>> >>>>>>>>>>>       [1].
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> Thanks
> >>>>> >>>>>>>>>>> fanrui
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
> >>>>> >>>>>>> kaa....@yandex.com
> >>>>> >>>>>>>>>>> wrote:
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>>> Hi,
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here
> >>>>> is
> >>>>> >>>>> some
> >>>>> >>>>>>>>>>>> conclusion:
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>> First of all, let's split this into two tasks.
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per
> channel.
> >>>>> >>>> This
> >>>>> >>>>>>> means
> >>>>> >>>>>>>> if
> >>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the
> >>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we
> just
> >>>>> >>>>> ignore
> >>>>> >>>>>>> that
> >>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool
> has
> >>>>> >>>> it(it
> >>>>> >>>>>> is
> >>>>> >>>>>>>>>>>> actually not a overdraft).
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty
> >>>>> >>>>>> convinced
> >>>>> >>>>>>>> now
> >>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for
> limitation
> >>>>> of
> >>>>> >>>>>>>> overdraft
> >>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all
> >>>>> >>>>> buffers
> >>>>> >>>>>> of
> >>>>> >>>>>>>> one
> >>>>> >>>>>>>>>>>> TaskManager considering that several different jobs can
> be
> >>>>> >>>>>>> submitted
> >>>>> >>>>>>>> on
> >>>>> >>>>>>>>>>>> this TaskManager). So idea is to have
> >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
> >>>>> >>>>>>>>>> LocalBufferPool).
> >>>>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool
> >>>>> is
> >>>>> >>>>>>> reached,
> >>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from
> >>>>> >>>> NetworkBufferPool
> >>>>> >>>>>> up
> >>>>> >>>>>>> to
> >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource
> >>>>> since it
> >>>>> >>>>>>>> actually
> >>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in
> >>>>> >>>> overdraft
> >>>>> >>>>>>> mode
> >>>>> >>>>>>>>>>>> which is not a target. So we still need to think about
> >>>>> that.
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>      29.04.2022 11:11, rui fan пишет:
> >>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov,
> >>>>> >>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum
> >>>>> number of
> >>>>> >>>>>>>> overdraft
> >>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for,
> right?
> >>>>> >>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to
> >>>>> don't
> >>>>> >>>> add
> >>>>> >>>>>> the
> >>>>> >>>>>>>> new
> >>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the
> >>>>> community.
> >>>>> >>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>> Best wishes
> >>>>> >>>>>>>>>>>>> fanrui
> >>>>> >>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
> >>>>> >>>>> 1996fan...@gmail.com>
> >>>>> >>>>>>>>>> wrote:
> >>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov,
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are
> >>>>> totally
> >>>>> >>>>>> right.
> >>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be
> used
> >>>>> as
> >>>>> >>>>> the
> >>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
> >>>>> >>>>>>> configuration.Flink
> >>>>> >>>>>>>>>> users
> >>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the
> >>>>> overdraft
> >>>>> >>>>>> buffer
> >>>>> >>>>>>>>>> size.
> >>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we
> >>>>> >>>> should
> >>>>> >>>>>>> limit
> >>>>> >>>>>>>>>> the
> >>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each
> >>>>> >>>>> LocalBufferPool
> >>>>> >>>>>>>>>>>>>> can apply for.
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>> Why do we limit it?
> >>>>> >>>>>>>>>>>>>> Some operators don't check the
> >>>>> `recordWriter.isAvailable`
> >>>>> >>>>>> during
> >>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have
> >>>>> mentioned
> >>>>> >>>> it
> >>>>> >>>>>> in
> >>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other
> cases.
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use
> >>>>> up
> >>>>> >>>> all
> >>>>> >>>>>>>>>> remaining
> >>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure
> is
> >>>>> >>>>>> severe.
> >>>>> >>>>>>>>>>>>>> How to limit it?
> >>>>> >>>>>>>>>>>>>> I prefer to hard code the
> >>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
> >>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
> >>>>> >>>>> maxOverdraftBuffers
> >>>>> >>>>>> is
> >>>>> >>>>>>>>>> just
> >>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink
> >>>>> jobs. Or
> >>>>> >>>>> we
> >>>>> >>>>>>> can
> >>>>> >>>>>>>>>> set
> >>>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions,
> >>>>> 10)`
> >>>>> >>>> to
> >>>>> >>>>>>> handle
> >>>>> >>>>>>>>>>>>>> some jobs of low parallelism.
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we
> >>>>> can
> >>>>> >>>>> set
> >>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
> >>>>> >>>> LocalBufferPool.
> >>>>> >>>>>>>> Because
> >>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint.
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>> Best wishes
> >>>>> >>>>>>>>>>>>>> fanrui
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <
> >>>>> >>>>>>>>>> kaa....@yandex.com>
> >>>>> >>>>>>>>>>>>>> wrote:
> >>>>> >>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> Hi fanrui,
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP.
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it
> >>>>> >>>> should
> >>>>> >>>>>>> help
> >>>>> >>>>>>>> in
> >>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about
> >>>>> >>>>>> configuration:
> >>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand
> >>>>> right
> >>>>> >>>>> now
> >>>>> >>>>>>> we
> >>>>> >>>>>>>>>> have
> >>>>> >>>>>>>>>>>>>>> following calculation.
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
> >>>>> >>>>> memory(calculated
> >>>>> >>>>>>> via
> >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
> >>>>> >>>>>>>> taskmanager.memory.network.min,
> >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory
> size) /
> >>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size.
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive
> >>>>> >>>> buffers
> >>>>> >>>>> *
> >>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in
> >>>>> >>>>>> TaskManager
> >>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which
> used
> >>>>> at
> >>>>> >>>>>>> current
> >>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to
> >>>>> >>>>>>> maxBuffersNumber
> >>>>> >>>>>>>>>> which
> >>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if
> >>>>> >>>> requiredBuffersNumber
> >>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not
> >>>>> good)
> >>>>> >>>>> since
> >>>>> >>>>>>> not
> >>>>> >>>>>>>>>> all
> >>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if
> >>>>> Flink
> >>>>> >>>> can
> >>>>> >>>>>> not
> >>>>> >>>>>>>>>>>>>>> allocate floating buffers)
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I
> >>>>> >>>>>> understand
> >>>>> >>>>>>>>>> Flink
> >>>>> >>>>>>>>>>>>>>> just never use these leftovers
> >>>>> buffers(maxBuffersNumber -
> >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we
> >>>>> can
> >>>>> >>>>>> actualy
> >>>>> >>>>>>>> use
> >>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
> >>>>> >>>> buffersInUseNumber'
> >>>>> >>>>>>> since
> >>>>> >>>>>>>>>> if
> >>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators including
> >>>>> >>>>> 'window'
> >>>>> >>>>>>>> which
> >>>>> >>>>>>>>>> can
> >>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool).
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to
> >>>>> >>>> requesting
> >>>>> >>>>>>>> buffers
> >>>>> >>>>>>>>>>>>>>> during processing single record while switching to
> >>>>> >>>>>> unavalability
> >>>>> >>>>>>>>>>>> between
> >>>>> >>>>>>>>>>>>>>> records should be the same as we have it now):
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> * If one more buffer requested but
> maxBuffersPerChannel
> >>>>> >>>>>> reached,
> >>>>> >>>>>>>>>> then
> >>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers
> >>>>> from
> >>>>> >>>>> any
> >>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet
> >>>>> >>>> otherwise
> >>>>> >>>>>>> from
> >>>>> >>>>>>>>>>>>>>> NetworkBufferPool)
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally
> >>>>> >>>> allocate
> >>>>> >>>>>> it
> >>>>> >>>>>>>> from
> >>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't
> work,
> >>>>> >>>> but I
> >>>>> >>>>>>> like
> >>>>> >>>>>>>> it
> >>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch
> without
> >>>>> >>>> any
> >>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be
> >>>>> configuration
> >>>>> >>>> by
> >>>>> >>>>>>>>>> changing
> >>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and
> >>>>> requiredBuffersNumber.
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really
> want
> >>>>> to
> >>>>> >>>>>>>> implement
> >>>>> >>>>>>>>>> new
> >>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to
> >>>>> >>>>> correctly
> >>>>> >>>>>>>>>> configure
> >>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I
> don't
> >>>>> >>>> want
> >>>>> >>>>>> to
> >>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to
> >>>>> >>>> resolve
> >>>>> >>>>>> the
> >>>>> >>>>>>>>>> problem
> >>>>> >>>>>>>>>>>>>>> automatically(as described above).
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers
> >>>>> >>>> correct?
> >>>>> >>>>>>>>>>>>>>> --
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> Best regards,
> >>>>> >>>>>>>>>>>>>>> Anton Kalashnikov
> >>>>> >>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
> >>>>> >>>>>>>>>>>>>>>> Hi everyone,
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major
> feature
> >>>>> of
> >>>>> >>>>>> Flink.
> >>>>> >>>>>>>> It
> >>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout
> >>>>> or
> >>>>> >>>>> slow
> >>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work
> >>>>> well
> >>>>> >>>>>> when
> >>>>> >>>>>>>> the
> >>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers
> >>>>> are
> >>>>> >>>>>>> required
> >>>>> >>>>>>>> to
> >>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also
> >>>>> mentioned
> >>>>> >>>>> this
> >>>>> >>>>>>>> issue
> >>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve
> >>>>> it.
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail
> the
> >>>>> >>>>>>> overdraft
> >>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
> >>>>> >>>> Kalashnikov,
> >>>>> >>>>>>> there
> >>>>> >>>>>>>>>> are
> >>>>> >>>>>>>>>>>>>>>> still some points to discuss:
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>>       * There are already a lot of buffer-related
> >>>>> >>>>>> configurations.
> >>>>> >>>>>>>> Do
> >>>>> >>>>>>>>>> we
> >>>>> >>>>>>>>>>>>>>>>         need to add a new configuration for the
> >>>>> overdraft
> >>>>> >>>>>> buffer?
> >>>>> >>>>>>>>>>>>>>>>       * Where should the overdraft buffer use
> memory?
> >>>>> >>>>>>>>>>>>>>>>       * If the overdraft-buffer uses the memory
> >>>>> remaining
> >>>>> >>>> in
> >>>>> >>>>>> the
> >>>>> >>>>>>>>>>>>>>>>         NetworkBufferPool, no new configuration
> needs
> >>>>> to be
> >>>>> >>>>>>> added.
> >>>>> >>>>>>>>>>>>>>>>       * If adding a new configuration:
> >>>>> >>>>>>>>>>>>>>>>           o Should we set the overdraft-memory-size
> >>>>> at the
> >>>>> >>>> TM
> >>>>> >>>>>>> level
> >>>>> >>>>>>>>>> or
> >>>>> >>>>>>>>>>>> the
> >>>>> >>>>>>>>>>>>>>>>             Task level?
> >>>>> >>>>>>>>>>>>>>>>           o Or set overdraft-buffers to indicate the
> >>>>> number
> >>>>> >>>>> of
> >>>>> >>>>>>>>>>>>>>>>             memory-segments that can be overdrawn.
> >>>>> >>>>>>>>>>>>>>>>           o What is the default value? How to set
> >>>>> sensible
> >>>>> >>>>>>>> defaults?
> >>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it
> >>>>> using
> >>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers
> >>>>> at
> >>>>> >>>>> Task
> >>>>> >>>>>>>> level,
> >>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each
> LocalBufferPool
> >>>>> >>>> can
> >>>>> >>>>>>>>>> overdraw up
> >>>>> >>>>>>>>>>>>>>>> to 10 memory-segments.
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback!
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>> Thanks,
> >>>>> >>>>>>>>>>>>>>>> fanrui
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>>>>> [1]
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>
> >>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> >>>>> >>>>>>>>>>>>>>>> [2]
> https://issues.apache.org/jira/browse/FLINK-14396
> >>>>> >>>>>>>>>>>>>>>> [3]
> https://issues.apache.org/jira/browse/FLINK-26762
> >>>>> >>>>>>>>>>>>>>>> [4]
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>
> >>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
> >>>>> >>>>>>>>>>>>>>>> [5]
> >>>>> >>>>>>>>>>>>>>>>
> >>>>> >>
> >>>>>
> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
> >>>>> >>>>>>>>>>>>>>>> [6]
> >>>>> https://github.com/apache/flink-benchmarks/pull/54
> >>>>> >>>>>>>>>>>> --
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>> Best regards,
> >>>>> >>>>>>>>>>>> Anton Kalashnikov
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>>>>
> >>>>> >>>>>>>>>> --
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> Best regards,
> >>>>> >>>>>>>>>> Anton Kalashnikov
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>>
> >>>>> >> --
> >>>>> >>
> >>>>> >> Best regards,
> >>>>> >> Anton Kalashnikov
> >>>>> >>
> >>>>> >>
> >>>>>
> >>>>
>

Reply via email to