Hi

I created the FLINK-27530[1] as the parent ticket. And I
updated it to FLIP.

[1] https://issues.apache.org/jira/browse/FLINK-27530

Thanks
fanrui

On Fri, May 6, 2022 at 4:27 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi,
>
> I'm not sure. Maybe 5 will be fine? Anton, Dawid, what do you think?
>
> Can you create a parent ticket for the whole FLIP to group all of the
> issues together?
>
> Also FLIP should be officially voted first.
>
> Best,
> Piotrek
>
> pt., 6 maj 2022 o 09:08 rui fan <1996fan...@gmail.com> napisał(a):
>
> > Hi Anton, Piotrek and Dawid,
> >
> > Thanks for your help.
> >
> > I created FLINK-27522[1] as the first task. And I will finish it asap.
> >
> > @Piotrek, for the default value, do you think it should be less
> > than 5? What do you think about 3? Actually, I think 5 isn't big.
> > It's 1 or 3 or 5 that doesn't matter much, the focus is on
> > reasonably resolving deadlock problems. Or I push the second
> > task to move forward first and we discuss the default value in PR.
> >
> > For the legacySource, I got your idea. And I propose we create
> > the third task to handle it. Because it is independent and for
> > compatibility with the old API. What do you think? I updated
> > the third task on FLIP-227[2].
> >
> > If all is ok, I will create a JIRA for the third Task and add it to
> > FLIP-227. And I will develop them from the first task to the
> > third task.
> >
> > Thanks again for your help.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-27522
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
> >
> > Thanks
> > fanrui
> >
> > On Fri, May 6, 2022 at 3:50 AM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi fanrui,
> > >
> > > > How to identify legacySource?
> > >
> > > legacy sources are always using the SourceStreamTask class and
> > > SourceStreamTask is used only for legacy sources. But I'm not sure how
> to
> > > enable/disable that. Adding `disableOverdraft()` call in
> SourceStreamTask
> > > would be better compared to relying on the `getAvailableFuture()` call
> > > (isn't it used for back pressure metric anyway?). Ideally we should
> > > enable/disable it in the constructors, but that might be tricky.
> > >
> > > > I prefer it to be between 5 and 10
> > >
> > > I would vote for a smaller value because of FLINK-13203
> > >
> > > Piotrek
> > >
> > >
> > >
> > > czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a):
> > >
> > >> Hi,
> > >>
> > >> Thanks a lot for your discussion.
> > >>
> > >> After several discussions, I think it's clear now. I updated the
> > >> "Proposed Changes" of FLIP-227[1]. If I have something
> > >> missing, please help to add it to FLIP, or add it in the mail
> > >> and I can add it to FLIP. If everything is OK, I will create a
> > >> new JIRA for the first task, and use FLINK-26762[2] as the
> > >> second task.
> > >>
> > >> About the legacy source, do we set maxOverdraftBuffersPerGate=0
> > >> directly? How to identify legacySource? Or could we add
> > >> the overdraftEnabled in LocalBufferPool? The default value
> > >> is false. If the getAvailableFuture is called, change
> > >> overdraftEnabled=true.
> > >> It indicates whether there are checks isAvailable elsewhere.
> > >> It might be more general, it can cover more cases.
> > >>
> > >> Also, I think the default value of 'max-overdraft-buffers-per-gate'
> > >> needs to be confirmed. I prefer it to be between 5 and 10. How
> > >> do you think?
> > >>
> > >> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
> > >> [2] https://issues.apache.org/jira/browse/FLINK-26762
> > >>
> > >> Thanks
> > >> fanrui
> > >>
> > >> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org>
> > >> wrote:
> > >>
> > >>> Hi again,
> > >>>
> > >>> After sleeping over this, if both versions (reserve and overdraft)
> have
> > >>> the same complexity, I would also prefer the overdraft.
> > >>>
> > >>> > `Integer.MAX_VALUE` as default value was my idea as well but now,
> as
> > >>> > Dawid mentioned, I think it is dangerous since it is too implicit
> for
> > >>> > the user and if the user submits one more job for the same
> TaskManger
> > >>>
> > >>> As I mentioned, it's not only an issue with multiple jobs. The same
> > >>> problem can happen with different subtasks from the same job,
> > potentially
> > >>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I
> > would be
> > >>> in favour of Integer.MAX_VALUE to be the default value, but as it
> is, I
> > >>> think we should indeed play on the safe side and limit it.
> > >>>
> > >>> > I still don't understand how should be limited "reserve"
> > >>> implementation.
> > >>> > I mean if we have X buffers in total and the user sets overdraft
> > equal
> > >>> > to X we obviously can not reserve all buffers, but how many we are
> > >>> > allowed to reserve? Should it be a different configuration like
> > >>> > percentegeForReservedBuffers?
> > >>>
> > >>> The reserve could be defined as percentage, or as a fixed number of
> > >>> buffers. But yes. In normal operation subtask would not use the
> > reserve, as
> > >>> if numberOfAvailableBuffers < reserve, the output would be not
> > available.
> > >>> Only in the flatMap/timers/huge records case the reserve could be
> used.
> > >>>
> > >>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers,
> > >>> will LocalBufferPool never be available? Can't process data?
> > >>>
> > >>> Of course we would need to make sure that never happens. So the
> reserve
> > >>> should be < total buffer size.
> > >>>
> > >>> > 2. If the overdraft buffer use the extra buffers, when the
> downstream
> > >>> > task inputBuffer is insufficient, it should fail to start the job,
> > and
> > >>> then
> > >>> > restart? When the InputBuffer is initialized, it will apply for
> > enough
> > >>> > buffers, right?
> > >>>
> > >>> The failover if downstream can not allocate buffers is already
> > >>> implemented FLINK-14872 [2]. There is a timeout for how long the task
> > is
> > >>> waiting for buffer allocation. However this doesn't prevent many
> > >>> (potentially infinitely many) deadlock/restarts cycles. IMO the
> propper
> > >>> solution for [1] would be 2b described in the ticket:
> > >>>
> > >>> > 2b. Assign extra buffers only once all of the tasks are RUNNING.
> This
> > >>> is a simplified version of 2a, without tracking the tasks
> > sink-to-source.
> > >>>
> > >>> But that's a pre-existing problem and I don't think we have to solve
> it
> > >>> before implementing overdraft. I think we would need to solve it only
> > >>> before setting Integer.MAX_VALUE as the default for the overdraft.
> > Maybe I
> > >>> would hesitate setting the overdraft to anything more then a couple
> of
> > >>> buffers by default for the same reason.
> > >>>
> > >>> > Actually, I totally agree that we don't need a lot of buffers for
> > >>> overdraft
> > >>>
> > >>> and
> > >>>
> > >>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
> > >>> > When we finish this feature and after users use it, if users
> feedback
> > >>> > this issue we can discuss again.
> > >>>
> > >>> +1
> > >>>
> > >>> Piotrek
> > >>>
> > >>> [1] https://issues.apache.org/jira/browse/FLINK-13203
> > >>> [2] https://issues.apache.org/jira/browse/FLINK-14872
> > >>>
> > >>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):
> > >>>
> > >>>> Hi everyone,
> > >>>>
> > >>>> I still have some questions.
> > >>>>
> > >>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers,
> > will
> > >>>> LocalBufferPool never be available? Can't process data?
> > >>>> 2. If the overdraft buffer use the extra buffers, when the
> downstream
> > >>>> task inputBuffer is insufficient, it should fail to start the job,
> and
> > >>>> then
> > >>>> restart? When the InputBuffer is initialized, it will apply for
> enough
> > >>>> buffers, right?
> > >>>>
> > >>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
> > >>>> When we finish this feature and after users use it, if users
> feedback
> > >>>> this issue we can discuss again.
> > >>>>
> > >>>> Thanks
> > >>>> fanrui
> > >>>>
> > >>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <
> > dwysakow...@apache.org>
> > >>>> wrote:
> > >>>>
> > >>>>> Hey all,
> > >>>>>
> > >>>>> I have not replied in the thread yet, but I was following the
> > >>>>> discussion.
> > >>>>>
> > >>>>> Personally, I like Fanrui's and Anton's idea. As far as I
> understand
> > >>>>> it
> > >>>>> the idea to distinguish between inside flatMap & outside would be
> > >>>>> fairly
> > >>>>> simple, but maybe slightly indirect. The checkAvailability would
> > >>>>> remain
> > >>>>> unchanged and it is checked always between separate invocations of
> > the
> > >>>>> UDF. Therefore the overdraft buffers would not apply there. However
> > >>>>> once
> > >>>>> the pool says it is available, it means it has at least an initial
> > >>>>> buffer. So any additional request without checking for availability
> > >>>>> can
> > >>>>> be considered to be inside of processing a single record. This does
> > >>>>> not
> > >>>>> hold just for the LegacySource as I don't think it actually checks
> > for
> > >>>>> the availability of buffers in the LocalBufferPool.
> > >>>>>
> > >>>>> In the offline chat with Anton, we also discussed if we need a
> limit
> > >>>>> of
> > >>>>> the number of buffers we could overdraft (or in other words if the
> > >>>>> limit
> > >>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to
> > >>>>> stay
> > >>>>> on the safe side and have it limited. The pool of network buffers
> is
> > >>>>> shared for the entire TaskManager, so it means it can be shared
> even
> > >>>>> across tasks of separate jobs. However, I might be just
> unnecessarily
> > >>>>> cautious here.
> > >>>>>
> > >>>>> Best,
> > >>>>>
> > >>>>> Dawid
> > >>>>>
> > >>>>> On 04/05/2022 10:54, Piotr Nowojski wrote:
> > >>>>> > Hi,
> > >>>>> >
> > >>>>> > Thanks for the answers.
> > >>>>> >
> > >>>>> >> we may still need to discuss whether the
> > >>>>> >> overdraft/reserve/spare should use extra buffers or buffers
> > >>>>> >> in (exclusive + floating buffers)?
> > >>>>> > and
> > >>>>> >
> > >>>>> >> These things resolve the different problems (at least as I see
> > >>>>> that).
> > >>>>> >> The current hardcoded "1"  says that we switch "availability" to
> > >>>>> >> "unavailability" when one more buffer is left(actually a little
> > less
> > >>>>> >> than one buffer since we write the last piece of data to this
> last
> > >>>>> >> buffer). The overdraft feature doesn't change this logic we
> still
> > >>>>> want
> > >>>>> >> to switch to "unavailability" in such a way but if we are
> already
> > in
> > >>>>> >> "unavailability" and we want more buffers then we can take
> > >>>>> "overdraft
> > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we
> need
> > >>>>> to
> > >>>>> >> understand when we should switch to "unavailability"
> > >>>>> > Ok, I see. So it seems to me that both of you have in mind to
> keep
> > >>>>> the
> > >>>>> > buffer pools as they are right now, but if we are in the middle
> of
> > >>>>> > processing a record, we can request extra overdraft buffers on
> top
> > of
> > >>>>> > those? This is another way to implement the overdraft to what I
> was
> > >>>>> > thinking. I was thinking about something like keeping the
> > >>>>> "overdraft" or
> > >>>>> > more precisely buffer "reserve" in the buffer pool. I think my
> > >>>>> version
> > >>>>> > would be easier to implement, because it is just fiddling with
> > >>>>> min/max
> > >>>>> > buffers calculation and slightly modified `checkAvailability()`
> > >>>>> logic.
> > >>>>> >
> > >>>>> > On the other hand  what you have in mind would better utilise the
> > >>>>> available
> > >>>>> > memory, right? It would require more code changes (how would we
> > know
> > >>>>> when
> > >>>>> > we are allowed to request the overdraft?). However, in this
> case, I
> > >>>>> would
> > >>>>> > be tempted to set the number of overdraft buffers by default to
> > >>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers
> as
> > >>>>> > necessary. The only downside that I can think of (apart of higher
> > >>>>> > complexity) would be higher chance of hitting a known/unsolved
> > >>>>> deadlock [1]
> > >>>>> > in a scenario:
> > >>>>> > - downstream task hasn't yet started
> > >>>>> > - upstream task requests overdraft and uses all available memory
> > >>>>> segments
> > >>>>> > from the global pool
> > >>>>> > - upstream task is blocked, because downstream task hasn't
> started
> > >>>>> yet and
> > >>>>> > can not consume any data
> > >>>>> > - downstream task tries to start, but can not, as there are no
> > >>>>> available
> > >>>>> > buffers
> > >>>>> >
> > >>>>> >> BTW, for watermark, the number of buffers it needs is
> > >>>>> >> numberOfSubpartitions. So if
> > overdraftBuffers=numberOfSubpartitions,
> > >>>>> >> the watermark won't block in requestMemory.
> > >>>>> > and
> > >>>>> >
> > >>>>> >> the best overdraft size will be equal to parallelism.
> > >>>>> > That's a lot of buffers. I don't think we need that many for
> > >>>>> broadcasting
> > >>>>> > watermarks. Watermarks are small, and remember that every
> > >>>>> subpartition has
> > >>>>> > some partially filled/empty WIP buffer, so the vast majority of
> > >>>>> > subpartitions will not need to request a new buffer.
> > >>>>> >
> > >>>>> > Best,
> > >>>>> > Piotrek
> > >>>>> >
> > >>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203
> > >>>>> >
> > >>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com>
> > >>>>> napisał(a):
> > >>>>> >
> > >>>>> >> Hi,
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>   >> Do you mean to ignore it while processing records, but keep
> > >>>>> using
> > >>>>> >> `maxBuffersPerChannel` when calculating the availability of the
> > >>>>> output?
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> Yes, it is correct.
> > >>>>> >>
> > >>>>> >>
> > >>>>> >>   >> Would it be a big issue if we changed it to check if at
> least
> > >>>>> >> "overdraft number of buffers are available", where "overdraft
> > >>>>> number" is
> > >>>>> >> configurable, instead of the currently hardcoded value of "1"?
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> These things resolve the different problems (at least as I see
> > >>>>> that).
> > >>>>> >> The current hardcoded "1"  says that we switch "availability" to
> > >>>>> >> "unavailability" when one more buffer is left(actually a little
> > less
> > >>>>> >> than one buffer since we write the last piece of data to this
> last
> > >>>>> >> buffer). The overdraft feature doesn't change this logic we
> still
> > >>>>> want
> > >>>>> >> to switch to "unavailability" in such a way but if we are
> already
> > in
> > >>>>> >> "unavailability" and we want more buffers then we can take
> > >>>>> "overdraft
> > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we
> need
> > >>>>> to
> > >>>>> >> understand when we should switch to "unavailability"
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> -- About "reserve" vs "overdraft"
> > >>>>> >>
> > >>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will
> > be
> > >>>>> >> equal to parallelism. Also, the user can set any value he wants.
> > So
> > >>>>> even
> > >>>>> >> if parallelism is small(~5) but the user's flatmap produces a
> lot
> > of
> > >>>>> >> data, the user can set 10 or even more. Which almost double the
> > max
> > >>>>> >> buffers and it will be impossible to reserve. At least we need
> to
> > >>>>> figure
> > >>>>> >> out how to protect from such cases (the limit for an
> overdraft?).
> > So
> > >>>>> >> actually it looks even more difficult than increasing the
> maximum
> > >>>>> buffers.
> > >>>>> >>
> > >>>>> >> I want to emphasize that overdraft buffers are soft
> configuration
> > >>>>> which
> > >>>>> >> means it takes as many buffers as the global buffers pool has
> > >>>>> >> available(maybe zero) but less than this configured value. It is
> > >>>>> also
> > >>>>> >> important to notice that perhaps, not many subtasks in
> TaskManager
> > >>>>> will
> > >>>>> >> be using this feature so we don't actually need a lot of
> available
> > >>>>> >> buffers for every subtask(Here, I mean that if we have only one
> > >>>>> >> window/flatmap operator and many other operators, then one
> > >>>>> TaskManager
> > >>>>> >> will have many ordinary subtasks which don't actually need
> > >>>>> overdraft and
> > >>>>> >> several subtasks that needs this feature). But in case of
> > >>>>> reservation,
> > >>>>> >> we will reserve some buffers for all operators even if they
> don't
> > >>>>> really
> > >>>>> >> need it.
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> -- Legacy source problem
> > >>>>> >>
> > >>>>> >> If we still want to change max buffers then it is problem for
> > >>>>> >> LegacySources(since every subtask of source will always use
> these
> > >>>>> >> overdraft). But right now, I think that we can force to set 0
> > >>>>> overdraft
> > >>>>> >> buffers for legacy subtasks in configuration during execution(if
> > it
> > >>>>> is
> > >>>>> >> not too late for changing configuration in this place).
> > >>>>> >>
> > >>>>> >>
> > >>>>> >> 03.05.2022 14:11, rui fan пишет:
> > >>>>> >>> Hi
> > >>>>> >>>
> > >>>>> >>> Thanks for Martijn Visser and Piotrek's feedback.  I agree with
> > >>>>> >>> ignoring the legacy source, it will affect our design. User
> > should
> > >>>>> >>> use the new Source Api as much as possible.
> > >>>>> >>>
> > >>>>> >>> Hi Piotrek, we may still need to discuss whether the
> > >>>>> >>> overdraft/reserve/spare should use extra buffers or buffers
> > >>>>> >>> in (exclusive + floating buffers)? They have some differences.
> > >>>>> >>>
> > >>>>> >>> If it uses extra buffers:
> > >>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1
> > >>>>> >>>    <= currentPoolSize) and all subpartitions don't reach the
> > >>>>> >>> maxBuffersPerChannel.
> > >>>>> >>>
> > >>>>> >>> If it uses the buffers in (exclusive + floating buffers):
> > >>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers +
> > >>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions
> > >>>>> >>> don't reach the maxBuffersPerChannel.
> > >>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8),
> > the
> > >>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be
> > >>>>> >>> easily unavailable. For throughput, if users turn up the
> > >>>>> >>> overdraft buffers, they need to turn up exclusive or floating
> > >>>>> >>> buffers. It also affects the InputChannel, and it's is
> unfriendly
> > >>>>> >>> to users.
> > >>>>> >>>
> > >>>>> >>> So I prefer the overdraft to use extra buffers.
> > >>>>> >>>
> > >>>>> >>>
> > >>>>> >>> BTW, for watermark, the number of buffers it needs is
> > >>>>> >>> numberOfSubpartitions. So if
> > >>>>> overdraftBuffers=numberOfSubpartitions,
> > >>>>> >>> the watermark won't block in requestMemory. But it has
> > >>>>> >>> 2 problems:
> > >>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses
> > >>>>> >>> (exclusive + floating buffers),  there will be fewer buffers
> > >>>>> >>> available. Throughput may be affected.
> > >>>>> >>> 2. The numberOfSubpartitions is different for each Task.
> > >>>>> >>> So if users want to cover watermark using this feature,
> > >>>>> >>> they don't know how to set the overdraftBuffers more r
> > >>>>> >>> easonably. And if the parallelism is changed, users still
> > >>>>> >>> need to change overdraftBuffers. It is unfriendly to users.
> > >>>>> >>>
> > >>>>> >>> So I propose we support overdraftBuffers=-1, It means
> > >>>>> >>> we will automatically set
> overdraftBuffers=numberOfSubpartitions
> > >>>>> >>> in the Constructor of LocalBufferPool.
> > >>>>> >>>
> > >>>>> >>> Please correct me if I'm wrong.
> > >>>>> >>>
> > >>>>> >>> Thanks
> > >>>>> >>> fanrui
> > >>>>> >>>
> > >>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <
> > >>>>> pnowoj...@apache.org>
> > >>>>> >> wrote:
> > >>>>> >>>> Hi fanrui,
> > >>>>> >>>>
> > >>>>> >>>>> Do you mean don't add the extra buffers? We just use
> (exclusive
> > >>>>> >> buffers *
> > >>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be
> > >>>>> available
> > >>>>> >>>> when
> > >>>>> >>>>> (usedBuffers+overdraftBuffers <=
> > >>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers)
> > >>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel,
> > >>>>> right?
> > >>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum
> > >>>>> number of
> > >>>>> >> the
> > >>>>> >>>> required buffers, just as we did when we were implementing the
> > non
> > >>>>> >> blocking
> > >>>>> >>>> outputs and adding availability logic to LocalBufferPool. Back
> > >>>>> then we
> > >>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this
> > >>>>> logic is
> > >>>>> >>>> located
> > >>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
> > >>>>> >>>>
> > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
> > >>>>> numSubpartitions + 1;
> > >>>>> >>>> For performance reasons, we always require at least one buffer
> > per
> > >>>>> >>>> sub-partition. Otherwise performance falls drastically. Now if
> > we
> > >>>>> >> require 5
> > >>>>> >>>> overdraft buffers for output to be available, we need to have
> > >>>>> them on
> > >>>>> >> top
> > >>>>> >>>> of those "one buffer per sub-partition". So the logic should
> be
> > >>>>> changed
> > >>>>> >> to:
> > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
> > >>>>> numSubpartitions +
> > >>>>> >>>> numOverdraftBuffers;
> > >>>>> >>>>
> > >>>>> >>>> Regarding increasing the number of max buffers I'm not sure.
> As
> > >>>>> long as
> > >>>>> >>>> "overdraft << max number of buffers", because all buffers on
> the
> > >>>>> outputs
> > >>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft
> > >>>>> buffers,
> > >>>>> >> and
> > >>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of
> > >>>>> things if
> > >>>>> >> we
> > >>>>> >>>> make the output available if at least one single buffer is
> > >>>>> available or
> > >>>>> >> at
> > >>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So
> > >>>>> effects of
> > >>>>> >>>> increasing the overdraft from 1 to for example 5 should be
> > >>>>> negligible.
> > >>>>> >> For
> > >>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5
> > still
> > >>>>> >> increases
> > >>>>> >>>> the overdraft by only about 25%. So maybe we can keep the max
> as
> > >>>>> it is?
> > >>>>> >>>>
> > >>>>> >>>> If so, maybe we should change the name from "overdraft" to
> > "buffer
> > >>>>> >> reserve"
> > >>>>> >>>> or "spare buffers"? And document it as "number of buffers kept
> > in
> > >>>>> >> reserve
> > >>>>> >>>> in case of flatMap/firing timers/huge records"?
> > >>>>> >>>>
> > >>>>> >>>> What do you think Fenrui, Anton?
> > >>>>> >>>>
> > >>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the
> new
> > >>>>> >> features,
> > >>>>> >>>> as long as we don't brake the existing deployments too much.
> > >>>>> >>>>
> > >>>>> >>>> Best,
> > >>>>> >>>> Piotrek
> > >>>>> >>>>
> > >>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com
> >
> > >>>>> >> napisał(a):
> > >>>>> >>>>> Hi everyone,
> > >>>>> >>>>>
> > >>>>> >>>>> Just wanted to chip in on the discussion of legacy sources:
> > >>>>> IMHO, we
> > >>>>> >>>> should
> > >>>>> >>>>> not focus too much on improving/adding capabilities for
> legacy
> > >>>>> sources.
> > >>>>> >>>> We
> > >>>>> >>>>> want to persuade and push users to use the new Source API.
> Yes,
> > >>>>> this
> > >>>>> >>>> means
> > >>>>> >>>>> that there's work required by the end users to port any
> custom
> > >>>>> source
> > >>>>> >> to
> > >>>>> >>>>> the new interface. The benefits of the new Source API should
> > >>>>> outweigh
> > >>>>> >>>> this.
> > >>>>> >>>>> Anything that we build to support multiple interfaces means
> > >>>>> adding more
> > >>>>> >>>>> complexity and more possibilities for bugs. Let's try to make
> > our
> > >>>>> >> lives a
> > >>>>> >>>>> little bit easier.
> > >>>>> >>>>>
> > >>>>> >>>>> Best regards,
> > >>>>> >>>>>
> > >>>>> >>>>> Martijn Visser
> > >>>>> >>>>> https://twitter.com/MartijnVisser82
> > >>>>> >>>>> https://github.com/MartijnVisser
> > >>>>> >>>>>
> > >>>>> >>>>>
> > >>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com>
> > >>>>> wrote:
> > >>>>> >>>>>
> > >>>>> >>>>>> Hi Piotrek
> > >>>>> >>>>>>
> > >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep
> > >>>>> using
> > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of
> > the
> > >>>>> >>>> output?
> > >>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double
> > check.
> > >>>>> >>>>>>
> > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it
> a
> > >>>>> big
> > >>>>> >>>>> problem
> > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we
> > already
> > >>>>> have
> > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
> > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a
> > single
> > >>>>> >>>> buffer
> > >>>>> >>>>>>> available and this works the same for all tasks (including
> > >>>>> legacy
> > >>>>> >>>>> source
> > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check
> if
> > >>>>> at least
> > >>>>> >>>>>>> "overdraft number of buffers are available", where
> "overdraft
> > >>>>> number"
> > >>>>> >>>>> is
> > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of
> > "1"?
> > >>>>> >>>>>> Do you mean don't add the extra buffers? We just use
> > (exclusive
> > >>>>> >>>> buffers *
> > >>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be
> > >>>>> available
> > >>>>> >>>>> when
> > >>>>> >>>>>> (usedBuffers+overdraftBuffers <=
> > >>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers)
> > >>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel,
> > >>>>> right?
> > >>>>> >>>>>>
> > >>>>> >>>>>> If yes, I think it can solve the problem of legacy source.
> > >>>>> There may
> > >>>>> >> be
> > >>>>> >>>>>> some impact. If overdraftBuffers is large and only one
> buffer
> > >>>>> is used
> > >>>>> >>>> to
> > >>>>> >>>>>> process a single record, exclusive buffers*parallelism +
> > >>>>> floating
> > >>>>> >>>> buffers
> > >>>>> >>>>>> cannot be used. It may only be possible to use (exclusive
> > >>>>> buffers *
> > >>>>> >>>>>> parallelism
> > >>>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput,
> > if
> > >>>>> turn
> > >>>>> >> up
> > >>>>> >>>>> the
> > >>>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive
> > or
> > >>>>> >>>> floating
> > >>>>> >>>>>> buffers. And it also affects the InputChannel.
> > >>>>> >>>>>>
> > >>>>> >>>>>> If not, I don't think it can solve the problem of legacy
> > >>>>> source. The
> > >>>>> >>>>> legacy
> > >>>>> >>>>>> source don't check isAvailable, If there are the extra
> > buffers,
> > >>>>> legacy
> > >>>>> >>>>>> source
> > >>>>> >>>>>> will use them up until block in requestMemory.
> > >>>>> >>>>>>
> > >>>>> >>>>>>
> > >>>>> >>>>>> Thanks
> > >>>>> >>>>>> fanrui
> > >>>>> >>>>>>
> > >>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <
> > >>>>> pnowoj...@apache.org>
> > >>>>> >>>>>> wrote:
> > >>>>> >>>>>>
> > >>>>> >>>>>>> Hi,
> > >>>>> >>>>>>>
> > >>>>> >>>>>>> +1 for the general proposal from my side. It would be a
> nice
> > >>>>> >>>> workaround
> > >>>>> >>>>>>> flatMaps, WindowOperators and large records issues with
> > >>>>> unaligned
> > >>>>> >>>>>>> checkpoints.
> > >>>>> >>>>>>>
> > >>>>> >>>>>>>> The first task is about ignoring max buffers per channel.
> > This
> > >>>>> >>>> means
> > >>>>> >>>>> if
> > >>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the
> > >>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just
> > >>>>> ignore
> > >>>>> >>>> that
> > >>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has
> > >>>>> it(it is
> > >>>>> >>>>>>>> actually not a overdraft).
> > >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep
> > >>>>> using
> > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of
> > the
> > >>>>> >>>> output?
> > >>>>> >>>>>>>> The second task is about the real overdraft. I am pretty
> > >>>>> convinced
> > >>>>> >>>>> now
> > >>>>> >>>>>>>> that we, unfortunately, need configuration for limitation
> of
> > >>>>> >>>>> overdraft
> > >>>>> >>>>>>>> number(because it is not ok if one subtask allocates all
> > >>>>> buffers of
> > >>>>> >>>>> one
> > >>>>> >>>>>>>> TaskManager considering that several different jobs can be
> > >>>>> >>>> submitted
> > >>>>> >>>>> on
> > >>>>> >>>>>>>> this TaskManager). So idea is to have
> > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
> > >>>>> >>>>>> LocalBufferPool).
> > >>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool
> is
> > >>>>> >>>> reached,
> > >>>>> >>>>>>>> LocalBufferPool can request additionally from
> > >>>>> NetworkBufferPool up
> > >>>>> >>>> to
> > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers.
> > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it
> a
> > >>>>> big
> > >>>>> >>>>> problem
> > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we
> > already
> > >>>>> have
> > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
> > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a
> > single
> > >>>>> >>>> buffer
> > >>>>> >>>>>>> available and this works the same for all tasks (including
> > >>>>> legacy
> > >>>>> >>>>> source
> > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check
> if
> > >>>>> at least
> > >>>>> >>>>>>> "overdraft number of buffers are available", where
> "overdraft
> > >>>>> number"
> > >>>>> >>>>> is
> > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of
> > "1"?
> > >>>>> >>>>>>>
> > >>>>> >>>>>>> Best,
> > >>>>> >>>>>>> Piotrek
> > >>>>> >>>>>>>
> > >>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com>
> > >>>>> napisał(a):
> > >>>>> >>>>>>>
> > >>>>> >>>>>>>> Let me add some information about the LegacySource.
> > >>>>> >>>>>>>>
> > >>>>> >>>>>>>> If we want to disable the overdraft buffer for
> LegacySource.
> > >>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
> > >>>>> >>>>>>>> The default value is false. If the getAvailableFuture is
> > >>>>> called,
> > >>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there
> are
> > >>>>> >>>>>>>> checks isAvailable elsewhere.
> > >>>>> >>>>>>>>
> > >>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct
> > me
> > >>>>> if
> > >>>>> >>>> I'm
> > >>>>> >>>>>>> wrong.
> > >>>>> >>>>>>>> Thanks
> > >>>>> >>>>>>>> fanrui
> > >>>>> >>>>>>>>
> > >>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <
> > >>>>> 1996fan...@gmail.com>
> > >>>>> >>>>> wrote:
> > >>>>> >>>>>>>>> Hi,
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>> Thanks for your quick response.
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need
> > to
> > >>>>> >>>>> discuss
> > >>>>> >>>>>>> the
> > >>>>> >>>>>>>>> default value in PR.
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for
> > >>>>> general
> > >>>>> >>>>>>>>> implementation.
> > >>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable()
> in
> > >>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common
> > >>>>> >>>>> LegacySource,
> > >>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs
> consume
> > >>>>> >>>> kafka,
> > >>>>> >>>>> so
> > >>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems.
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>> Core code:
> > >>>>> >>>>>>>>> ```
> > >>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() {
> > >>>>> >>>>>>>>>        if (recordWriter == null
> > >>>>> >>>>>>>>>             ||
> > >>>>> >>>>>>>>>
> > >>>>> >>
> > >>>>>
> > !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
> > >>>>> >>>>>>>>> false)
> > >>>>> >>>>>>>>>             || recordWriter.isAvailable()) {
> > >>>>> >>>>>>>>>             return;
> > >>>>> >>>>>>>>>        }
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>>        CompletableFuture<?> resumeFuture =
> > >>>>> >>>>>>>> recordWriter.getAvailableFuture();
> > >>>>> >>>>>>>>>        try {
> > >>>>> >>>>>>>>>             resumeFuture.get();
> > >>>>> >>>>>>>>>        } catch (Throwable ignored) {
> > >>>>> >>>>>>>>>        }
> > >>>>> >>>>>>>>> }
> > >>>>> >>>>>>>>> ```
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>> LegacySource calls
> > >>>>> sourceContext.ensureRecordWriterIsAvailable()
> > >>>>> >>>>>>>>> before synchronized (checkpointLock) and collects
> records.
> > >>>>> >>>>>>>>> Please let me know if there is a better solution.
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>> Thanks
> > >>>>> >>>>>>>>> fanrui
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
> > >>>>> >>>>>> kaa....@yandex.com>
> > >>>>> >>>>>>>>> wrote:
> > >>>>> >>>>>>>>>
> > >>>>> >>>>>>>>>> Hi.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs
> or
> > >>>>> two
> > >>>>> >>>>>> commits
> > >>>>> >>>>>>>> in a
> > >>>>> >>>>>>>>>>       PR?
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this
> > >>>>> task has
> > >>>>> >>>>>> fewer
> > >>>>> >>>>>>>>>> questions but we should find a solution for LegacySource
> > >>>>> first.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> --  2. For the first task, if the flink user disables
> the
> > >>>>> >>>>> Unaligned
> > >>>>> >>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
> > >>>>> Because
> > >>>>> >>>> the
> > >>>>> >>>>>>>>>> overdraft
> > >>>>> >>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
> > >>>>> needs to
> > >>>>> >>>>> wait
> > >>>>> >>>>>>> for
> > >>>>> >>>>>>>>>>       downstream Task to consume.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> I think that the logic should be the same for AC and UC.
> > As
> > >>>>> I
> > >>>>> >>>>>>>> understand,
> > >>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it
> > >>>>> doesn't
> > >>>>> >>>>> make
> > >>>>> >>>>>>> it
> > >>>>> >>>>>>>>>> worse as well.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>>     3. For the second task
> > >>>>> >>>>>>>>>> --      - The default value of
> > >>>>> maxOverdraftBuffersPerPartition
> > >>>>> >>>> may
> > >>>>> >>>>>>> also
> > >>>>> >>>>>>>>>> need
> > >>>>> >>>>>>>>>>          to be discussed.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> I think it should be a pretty small value or even 0
> since
> > it
> > >>>>> >>>> kind
> > >>>>> >>>>> of
> > >>>>> >>>>>>>>>> optimization and user should understand what they
> > >>>>> do(especially
> > >>>>> >>>> if
> > >>>>> >>>>>> we
> > >>>>> >>>>>>>>>> implement the first task).
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> --      - If the user disables the Unaligned Checkpoint,
> > >>>>> can we
> > >>>>> >>>>> set
> > >>>>> >>>>>>> the
> > >>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
> > >>>>> overdraft
> > >>>>> >>>>>> isn't
> > >>>>> >>>>>>>>>> useful for
> > >>>>> >>>>>>>>>>          the Aligned Checkpoint.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't
> make
> > >>>>> >>>>>> degradation
> > >>>>> >>>>>>>> for
> > >>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make
> > >>>>> >>>>> difference
> > >>>>> >>>>>>>> between
> > >>>>> >>>>>>>>>> AC and UC.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>>       4. For the legacy source
> > >>>>> >>>>>>>>>> --      - If enabling the Unaligned Checkpoint, it uses
> up
> > >>>>> to
> > >>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
> > >>>>> >>>>>>>>>>          - If disabling the UC, it doesn't use the
> > overdraft
> > >>>>> >>>> buffer.
> > >>>>> >>>>>>>>>>          - Do you think it's ok?
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource
> at
> > >>>>> all
> > >>>>> >>>>> since
> > >>>>> >>>>>>> it
> > >>>>> >>>>>>>>>> can lead to undesirable results especially if the limit
> is
> > >>>>> high.
> > >>>>> >>>>> At
> > >>>>> >>>>>>>> least,
> > >>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode
> and
> > >>>>> it
> > >>>>> >>>> will
> > >>>>> >>>>>>>> borrow
> > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global
> > pool
> > >>>>> >>>> which
> > >>>>> >>>>>> can
> > >>>>> >>>>>>>> lead
> > >>>>> >>>>>>>>>> to degradation of other subtasks on the same
> TaskManager.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> --      - Actually, we added the checkAvailable logic
> for
> > >>>>> >>>>>> LegacySource
> > >>>>> >>>>>>>> in
> > >>>>> >>>>>>>>>> our
> > >>>>> >>>>>>>>>>          internal version. It works well.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> I don't really understand how it is possible for general
> > >>>>> case
> > >>>>> >>>>>>>> considering
> > >>>>> >>>>>>>>>> that each user has their own implementation of
> > >>>>> >>>>> LegacySourceOperator
> > >>>>> >>>>>>>>>> --   5. For the benchmark, do you have any suggestions?
> I
> > >>>>> >>>>> submitted
> > >>>>> >>>>>>> the
> > >>>>> >>>>>>>> PR
> > >>>>> >>>>>>>>>>       [1].
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon.
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
> > >>>>> >>>>>>>>>>> Hi,
> > >>>>> >>>>>>>>>>>
> > >>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions.
> > >>>>> >>>>>>>>>>>
> > >>>>> >>>>>>>>>>>       1. Do you mean split this into two JIRAs or two
> PRs
> > >>>>> or two
> > >>>>> >>>>>>> commits
> > >>>>> >>>>>>>>>> in a
> > >>>>> >>>>>>>>>>>       PR?
> > >>>>> >>>>>>>>>>>       2. For the first task, if the flink user disables
> > the
> > >>>>> >>>>>> Unaligned
> > >>>>> >>>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
> > >>>>> Because
> > >>>>> >>>>> the
> > >>>>> >>>>>>>>>> overdraft
> > >>>>> >>>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
> > >>>>> needs to
> > >>>>> >>>>>> wait
> > >>>>> >>>>>>>> for
> > >>>>> >>>>>>>>>>>       downstream Task to consume.
> > >>>>> >>>>>>>>>>>       3. For the second task
> > >>>>> >>>>>>>>>>>          - The default value of
> > >>>>> maxOverdraftBuffersPerPartition
> > >>>>> >>>>> may
> > >>>>> >>>>>>> also
> > >>>>> >>>>>>>>>> need
> > >>>>> >>>>>>>>>>>          to be discussed.
> > >>>>> >>>>>>>>>>>          - If the user disables the Unaligned
> Checkpoint,
> > >>>>> can we
> > >>>>> >>>>> set
> > >>>>> >>>>>>> the
> > >>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
> > >>>>> >>>> overdraft
> > >>>>> >>>>>>> isn't
> > >>>>> >>>>>>>>>> useful for
> > >>>>> >>>>>>>>>>>          the Aligned Checkpoint.
> > >>>>> >>>>>>>>>>>       4. For the legacy source
> > >>>>> >>>>>>>>>>>          - If enabling the Unaligned Checkpoint, it
> uses
> > >>>>> up to
> > >>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
> > >>>>> >>>>>>>>>>>          - If disabling the UC, it doesn't use the
> > >>>>> overdraft
> > >>>>> >>>>> buffer.
> > >>>>> >>>>>>>>>>>          - Do you think it's ok?
> > >>>>> >>>>>>>>>>>          - Actually, we added the checkAvailable logic
> > for
> > >>>>> >>>>>>> LegacySource
> > >>>>> >>>>>>>>>> in our
> > >>>>> >>>>>>>>>>>          internal version. It works well.
> > >>>>> >>>>>>>>>>>       5. For the benchmark, do you have any
> suggestions?
> > I
> > >>>>> >>>>> submitted
> > >>>>> >>>>>>> the
> > >>>>> >>>>>>>>>> PR
> > >>>>> >>>>>>>>>>>       [1].
> > >>>>> >>>>>>>>>>>
> > >>>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54
> > >>>>> >>>>>>>>>>>
> > >>>>> >>>>>>>>>>> Thanks
> > >>>>> >>>>>>>>>>> fanrui
> > >>>>> >>>>>>>>>>>
> > >>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
> > >>>>> >>>>>>> kaa....@yandex.com
> > >>>>> >>>>>>>>>>> wrote:
> > >>>>> >>>>>>>>>>>
> > >>>>> >>>>>>>>>>>> Hi,
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz.
> Here
> > >>>>> is
> > >>>>> >>>>> some
> > >>>>> >>>>>>>>>>>> conclusion:
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>> First of all, let's split this into two tasks.
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per
> > channel.
> > >>>>> >>>> This
> > >>>>> >>>>>>> means
> > >>>>> >>>>>>>> if
> > >>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and
> the
> > >>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we
> > just
> > >>>>> >>>>> ignore
> > >>>>> >>>>>>> that
> > >>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool
> > has
> > >>>>> >>>> it(it
> > >>>>> >>>>>> is
> > >>>>> >>>>>>>>>>>> actually not a overdraft).
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am
> pretty
> > >>>>> >>>>>> convinced
> > >>>>> >>>>>>>> now
> > >>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for
> > limitation
> > >>>>> of
> > >>>>> >>>>>>>> overdraft
> > >>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates
> all
> > >>>>> >>>>> buffers
> > >>>>> >>>>>> of
> > >>>>> >>>>>>>> one
> > >>>>> >>>>>>>>>>>> TaskManager considering that several different jobs
> can
> > be
> > >>>>> >>>>>>> submitted
> > >>>>> >>>>>>>> on
> > >>>>> >>>>>>>>>>>> this TaskManager). So idea is to have
> > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
> > >>>>> >>>>>>>>>> LocalBufferPool).
> > >>>>> >>>>>>>>>>>> In this case, when a limit of buffers in
> LocalBufferPool
> > >>>>> is
> > >>>>> >>>>>>> reached,
> > >>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from
> > >>>>> >>>> NetworkBufferPool
> > >>>>> >>>>>> up
> > >>>>> >>>>>>> to
> > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource
> > >>>>> since it
> > >>>>> >>>>>>>> actually
> > >>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in
> > >>>>> >>>> overdraft
> > >>>>> >>>>>>> mode
> > >>>>> >>>>>>>>>>>> which is not a target. So we still need to think about
> > >>>>> that.
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>      29.04.2022 11:11, rui fan пишет:
> > >>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov,
> > >>>>> >>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum
> > >>>>> number of
> > >>>>> >>>>>>>> overdraft
> > >>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for,
> > right?
> > >>>>> >>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to
> > >>>>> don't
> > >>>>> >>>> add
> > >>>>> >>>>>> the
> > >>>>> >>>>>>>> new
> > >>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the
> > >>>>> community.
> > >>>>> >>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>> Best wishes
> > >>>>> >>>>>>>>>>>>> fanrui
> > >>>>> >>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
> > >>>>> >>>>> 1996fan...@gmail.com>
> > >>>>> >>>>>>>>>> wrote:
> > >>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov,
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are
> > >>>>> totally
> > >>>>> >>>>>> right.
> > >>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be
> > used
> > >>>>> as
> > >>>>> >>>>> the
> > >>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
> > >>>>> >>>>>>> configuration.Flink
> > >>>>> >>>>>>>>>> users
> > >>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the
> > >>>>> overdraft
> > >>>>> >>>>>> buffer
> > >>>>> >>>>>>>>>> size.
> > >>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety,
> we
> > >>>>> >>>> should
> > >>>>> >>>>>>> limit
> > >>>>> >>>>>>>>>> the
> > >>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each
> > >>>>> >>>>> LocalBufferPool
> > >>>>> >>>>>>>>>>>>>> can apply for.
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>> Why do we limit it?
> > >>>>> >>>>>>>>>>>>>> Some operators don't check the
> > >>>>> `recordWriter.isAvailable`
> > >>>>> >>>>>> during
> > >>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have
> > >>>>> mentioned
> > >>>>> >>>> it
> > >>>>> >>>>>> in
> > >>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other
> > cases.
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will
> use
> > >>>>> up
> > >>>>> >>>> all
> > >>>>> >>>>>>>>>> remaining
> > >>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the
> backpressure
> > is
> > >>>>> >>>>>> severe.
> > >>>>> >>>>>>>>>>>>>> How to limit it?
> > >>>>> >>>>>>>>>>>>>> I prefer to hard code the
> > >>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
> > >>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
> > >>>>> >>>>> maxOverdraftBuffers
> > >>>>> >>>>>> is
> > >>>>> >>>>>>>>>> just
> > >>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink
> > >>>>> jobs. Or
> > >>>>> >>>>> we
> > >>>>> >>>>>>> can
> > >>>>> >>>>>>>>>> set
> > >>>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions,
> > >>>>> 10)`
> > >>>>> >>>> to
> > >>>>> >>>>>>> handle
> > >>>>> >>>>>>>>>>>>>> some jobs of low parallelism.
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint,
> we
> > >>>>> can
> > >>>>> >>>>> set
> > >>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
> > >>>>> >>>> LocalBufferPool.
> > >>>>> >>>>>>>> Because
> > >>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned
> Checkpoint.
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>> [1]
> https://issues.apache.org/jira/browse/FLINK-26759
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>> Best wishes
> > >>>>> >>>>>>>>>>>>>> fanrui
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <
> > >>>>> >>>>>>>>>> kaa....@yandex.com>
> > >>>>> >>>>>>>>>>>>>> wrote:
> > >>>>> >>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> Hi fanrui,
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP.
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and
> it
> > >>>>> >>>> should
> > >>>>> >>>>>>> help
> > >>>>> >>>>>>>> in
> > >>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about
> > >>>>> >>>>>> configuration:
> > >>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I
> understand
> > >>>>> right
> > >>>>> >>>>> now
> > >>>>> >>>>>>> we
> > >>>>> >>>>>>>>>> have
> > >>>>> >>>>>>>>>>>>>>> following calculation.
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
> > >>>>> >>>>> memory(calculated
> > >>>>> >>>>>>> via
> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
> > >>>>> >>>>>>>> taskmanager.memory.network.min,
> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory
> > size) /
> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size.
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive
> > >>>>> >>>> buffers
> > >>>>> >>>>> *
> > >>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number
> in
> > >>>>> >>>>>> TaskManager
> > >>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which
> > used
> > >>>>> at
> > >>>>> >>>>>>> current
> > >>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to
> > >>>>> >>>>>>> maxBuffersNumber
> > >>>>> >>>>>>>>>> which
> > >>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if
> > >>>>> >>>> requiredBuffersNumber
> > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not
> > >>>>> good)
> > >>>>> >>>>> since
> > >>>>> >>>>>>> not
> > >>>>> >>>>>>>>>> all
> > >>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if
> > >>>>> Flink
> > >>>>> >>>> can
> > >>>>> >>>>>> not
> > >>>>> >>>>>>>>>>>>>>> allocate floating buffers)
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber,
> as I
> > >>>>> >>>>>> understand
> > >>>>> >>>>>>>>>> Flink
> > >>>>> >>>>>>>>>>>>>>> just never use these leftovers
> > >>>>> buffers(maxBuffersNumber -
> > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. (
> we
> > >>>>> can
> > >>>>> >>>>>> actualy
> > >>>>> >>>>>>>> use
> > >>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
> > >>>>> >>>> buffersInUseNumber'
> > >>>>> >>>>>>> since
> > >>>>> >>>>>>>>>> if
> > >>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators
> including
> > >>>>> >>>>> 'window'
> > >>>>> >>>>>>>> which
> > >>>>> >>>>>>>>>> can
> > >>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool).
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to
> > >>>>> >>>> requesting
> > >>>>> >>>>>>>> buffers
> > >>>>> >>>>>>>>>>>>>>> during processing single record while switching to
> > >>>>> >>>>>> unavalability
> > >>>>> >>>>>>>>>>>> between
> > >>>>> >>>>>>>>>>>>>>> records should be the same as we have it now):
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> * If one more buffer requested but
> > maxBuffersPerChannel
> > >>>>> >>>>>> reached,
> > >>>>> >>>>>>>>>> then
> > >>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this
> buffers
> > >>>>> from
> > >>>>> >>>>> any
> > >>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet
> > >>>>> >>>> otherwise
> > >>>>> >>>>>>> from
> > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool)
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally
> > >>>>> >>>> allocate
> > >>>>> >>>>>> it
> > >>>>> >>>>>>>> from
> > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to
> allocate
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't
> > work,
> > >>>>> >>>> but I
> > >>>>> >>>>>>> like
> > >>>>> >>>>>>>> it
> > >>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch
> > without
> > >>>>> >>>> any
> > >>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be
> > >>>>> configuration
> > >>>>> >>>> by
> > >>>>> >>>>>>>>>> changing
> > >>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and
> > >>>>> requiredBuffersNumber.
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really
> > want
> > >>>>> to
> > >>>>> >>>>>>>> implement
> > >>>>> >>>>>>>>>> new
> > >>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to
> > >>>>> >>>>> correctly
> > >>>>> >>>>>>>>>> configure
> > >>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I
> > don't
> > >>>>> >>>> want
> > >>>>> >>>>>> to
> > >>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to
> > >>>>> >>>> resolve
> > >>>>> >>>>>> the
> > >>>>> >>>>>>>>>> problem
> > >>>>> >>>>>>>>>>>>>>> automatically(as described above).
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers
> > >>>>> >>>> correct?
> > >>>>> >>>>>>>>>>>>>>> --
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> Best regards,
> > >>>>> >>>>>>>>>>>>>>> Anton Kalashnikov
> > >>>>> >>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
> > >>>>> >>>>>>>>>>>>>>>> Hi everyone,
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major
> > feature
> > >>>>> of
> > >>>>> >>>>>> Flink.
> > >>>>> >>>>>>>> It
> > >>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint
> timeout
> > >>>>> or
> > >>>>> >>>>> slow
> > >>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not
> work
> > >>>>> well
> > >>>>> >>>>>> when
> > >>>>> >>>>>>>> the
> > >>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output
> buffers
> > >>>>> are
> > >>>>> >>>>>>> required
> > >>>>> >>>>>>>> to
> > >>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also
> > >>>>> mentioned
> > >>>>> >>>>> this
> > >>>>> >>>>>>>> issue
> > >>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to
> solve
> > >>>>> it.
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail
> > the
> > >>>>> >>>>>>> overdraft
> > >>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
> > >>>>> >>>> Kalashnikov,
> > >>>>> >>>>>>> there
> > >>>>> >>>>>>>>>> are
> > >>>>> >>>>>>>>>>>>>>>> still some points to discuss:
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>>       * There are already a lot of buffer-related
> > >>>>> >>>>>> configurations.
> > >>>>> >>>>>>>> Do
> > >>>>> >>>>>>>>>> we
> > >>>>> >>>>>>>>>>>>>>>>         need to add a new configuration for the
> > >>>>> overdraft
> > >>>>> >>>>>> buffer?
> > >>>>> >>>>>>>>>>>>>>>>       * Where should the overdraft buffer use
> > memory?
> > >>>>> >>>>>>>>>>>>>>>>       * If the overdraft-buffer uses the memory
> > >>>>> remaining
> > >>>>> >>>> in
> > >>>>> >>>>>> the
> > >>>>> >>>>>>>>>>>>>>>>         NetworkBufferPool, no new configuration
> > needs
> > >>>>> to be
> > >>>>> >>>>>>> added.
> > >>>>> >>>>>>>>>>>>>>>>       * If adding a new configuration:
> > >>>>> >>>>>>>>>>>>>>>>           o Should we set the
> overdraft-memory-size
> > >>>>> at the
> > >>>>> >>>> TM
> > >>>>> >>>>>>> level
> > >>>>> >>>>>>>>>> or
> > >>>>> >>>>>>>>>>>> the
> > >>>>> >>>>>>>>>>>>>>>>             Task level?
> > >>>>> >>>>>>>>>>>>>>>>           o Or set overdraft-buffers to indicate
> the
> > >>>>> number
> > >>>>> >>>>> of
> > >>>>> >>>>>>>>>>>>>>>>             memory-segments that can be overdrawn.
> > >>>>> >>>>>>>>>>>>>>>>           o What is the default value? How to set
> > >>>>> sensible
> > >>>>> >>>>>>>> defaults?
> > >>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it
> > >>>>> using
> > >>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets
> overdraft-buffers
> > >>>>> at
> > >>>>> >>>>> Task
> > >>>>> >>>>>>>> level,
> > >>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each
> > LocalBufferPool
> > >>>>> >>>> can
> > >>>>> >>>>>>>>>> overdraw up
> > >>>>> >>>>>>>>>>>>>>>> to 10 memory-segments.
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback!
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>> Thanks,
> > >>>>> >>>>>>>>>>>>>>>> fanrui
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>>>>> [1]
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>
> > >>>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > >>>>> >>>>>>>>>>>>>>>> [2]
> > https://issues.apache.org/jira/browse/FLINK-14396
> > >>>>> >>>>>>>>>>>>>>>> [3]
> > https://issues.apache.org/jira/browse/FLINK-26762
> > >>>>> >>>>>>>>>>>>>>>> [4]
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>
> > >>>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
> > >>>>> >>>>>>>>>>>>>>>> [5]
> > >>>>> >>>>>>>>>>>>>>>>
> > >>>>> >>
> > >>>>>
> >
> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
> > >>>>> >>>>>>>>>>>>>>>> [6]
> > >>>>> https://github.com/apache/flink-benchmarks/pull/54
> > >>>>> >>>>>>>>>>>> --
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>> Best regards,
> > >>>>> >>>>>>>>>>>> Anton Kalashnikov
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>>>>
> > >>>>> >>>>>>>>>> --
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>> Best regards,
> > >>>>> >>>>>>>>>> Anton Kalashnikov
> > >>>>> >>>>>>>>>>
> > >>>>> >>>>>>>>>>
> > >>>>> >> --
> > >>>>> >>
> > >>>>> >> Best regards,
> > >>>>> >> Anton Kalashnikov
> > >>>>> >>
> > >>>>> >>
> > >>>>>
> > >>>>
> >
>

Reply via email to