Hi everyone,

Offline Confirmed with Anton. He has replied in an earlier
email: "I vote for 5 as 'max-overdraft-buffers-per-gate'."
So as we understand everybody agrees this.

This FLIP-227 discussion is over, I've updated FLIP-227.
It will be split into 3 tickets to complete.

Thanks for all the discussion about this FLIP again, I will
open a vote today.

Best wishes
fanrui

On Fri, May 6, 2022 at 4:57 PM rui fan <1996fan...@gmail.com> wrote:

> Hi
>
> I created the FLINK-27530[1] as the parent ticket. And I
> updated it to FLIP.
>
> [1] https://issues.apache.org/jira/browse/FLINK-27530
>
> Thanks
> fanrui
>
> On Fri, May 6, 2022 at 4:27 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi,
>>
>> I'm not sure. Maybe 5 will be fine? Anton, Dawid, what do you think?
>>
>> Can you create a parent ticket for the whole FLIP to group all of the
>> issues together?
>>
>> Also FLIP should be officially voted first.
>>
>> Best,
>> Piotrek
>>
>> pt., 6 maj 2022 o 09:08 rui fan <1996fan...@gmail.com> napisał(a):
>>
>> > Hi Anton, Piotrek and Dawid,
>> >
>> > Thanks for your help.
>> >
>> > I created FLINK-27522[1] as the first task. And I will finish it asap.
>> >
>> > @Piotrek, for the default value, do you think it should be less
>> > than 5? What do you think about 3? Actually, I think 5 isn't big.
>> > It's 1 or 3 or 5 that doesn't matter much, the focus is on
>> > reasonably resolving deadlock problems. Or I push the second
>> > task to move forward first and we discuss the default value in PR.
>> >
>> > For the legacySource, I got your idea. And I propose we create
>> > the third task to handle it. Because it is independent and for
>> > compatibility with the old API. What do you think? I updated
>> > the third task on FLIP-227[2].
>> >
>> > If all is ok, I will create a JIRA for the third Task and add it to
>> > FLIP-227. And I will develop them from the first task to the
>> > third task.
>> >
>> > Thanks again for your help.
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-27522
>> > [2]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>> >
>> > Thanks
>> > fanrui
>> >
>> > On Fri, May 6, 2022 at 3:50 AM Piotr Nowojski <pnowoj...@apache.org>
>> > wrote:
>> >
>> > > Hi fanrui,
>> > >
>> > > > How to identify legacySource?
>> > >
>> > > legacy sources are always using the SourceStreamTask class and
>> > > SourceStreamTask is used only for legacy sources. But I'm not sure
>> how to
>> > > enable/disable that. Adding `disableOverdraft()` call in
>> SourceStreamTask
>> > > would be better compared to relying on the `getAvailableFuture()` call
>> > > (isn't it used for back pressure metric anyway?). Ideally we should
>> > > enable/disable it in the constructors, but that might be tricky.
>> > >
>> > > > I prefer it to be between 5 and 10
>> > >
>> > > I would vote for a smaller value because of FLINK-13203
>> > >
>> > > Piotrek
>> > >
>> > >
>> > >
>> > > czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a):
>> > >
>> > >> Hi,
>> > >>
>> > >> Thanks a lot for your discussion.
>> > >>
>> > >> After several discussions, I think it's clear now. I updated the
>> > >> "Proposed Changes" of FLIP-227[1]. If I have something
>> > >> missing, please help to add it to FLIP, or add it in the mail
>> > >> and I can add it to FLIP. If everything is OK, I will create a
>> > >> new JIRA for the first task, and use FLINK-26762[2] as the
>> > >> second task.
>> > >>
>> > >> About the legacy source, do we set maxOverdraftBuffersPerGate=0
>> > >> directly? How to identify legacySource? Or could we add
>> > >> the overdraftEnabled in LocalBufferPool? The default value
>> > >> is false. If the getAvailableFuture is called, change
>> > >> overdraftEnabled=true.
>> > >> It indicates whether there are checks isAvailable elsewhere.
>> > >> It might be more general, it can cover more cases.
>> > >>
>> > >> Also, I think the default value of 'max-overdraft-buffers-per-gate'
>> > >> needs to be confirmed. I prefer it to be between 5 and 10. How
>> > >> do you think?
>> > >>
>> > >> [1]
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>> > >> [2] https://issues.apache.org/jira/browse/FLINK-26762
>> > >>
>> > >> Thanks
>> > >> fanrui
>> > >>
>> > >> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org>
>> > >> wrote:
>> > >>
>> > >>> Hi again,
>> > >>>
>> > >>> After sleeping over this, if both versions (reserve and overdraft)
>> have
>> > >>> the same complexity, I would also prefer the overdraft.
>> > >>>
>> > >>> > `Integer.MAX_VALUE` as default value was my idea as well but now,
>> as
>> > >>> > Dawid mentioned, I think it is dangerous since it is too implicit
>> for
>> > >>> > the user and if the user submits one more job for the same
>> TaskManger
>> > >>>
>> > >>> As I mentioned, it's not only an issue with multiple jobs. The same
>> > >>> problem can happen with different subtasks from the same job,
>> > potentially
>> > >>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I
>> > would be
>> > >>> in favour of Integer.MAX_VALUE to be the default value, but as it
>> is, I
>> > >>> think we should indeed play on the safe side and limit it.
>> > >>>
>> > >>> > I still don't understand how should be limited "reserve"
>> > >>> implementation.
>> > >>> > I mean if we have X buffers in total and the user sets overdraft
>> > equal
>> > >>> > to X we obviously can not reserve all buffers, but how many we are
>> > >>> > allowed to reserve? Should it be a different configuration like
>> > >>> > percentegeForReservedBuffers?
>> > >>>
>> > >>> The reserve could be defined as percentage, or as a fixed number of
>> > >>> buffers. But yes. In normal operation subtask would not use the
>> > reserve, as
>> > >>> if numberOfAvailableBuffers < reserve, the output would be not
>> > available.
>> > >>> Only in the flatMap/timers/huge records case the reserve could be
>> used.
>> > >>>
>> > >>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers,
>> > >>> will LocalBufferPool never be available? Can't process data?
>> > >>>
>> > >>> Of course we would need to make sure that never happens. So the
>> reserve
>> > >>> should be < total buffer size.
>> > >>>
>> > >>> > 2. If the overdraft buffer use the extra buffers, when the
>> downstream
>> > >>> > task inputBuffer is insufficient, it should fail to start the job,
>> > and
>> > >>> then
>> > >>> > restart? When the InputBuffer is initialized, it will apply for
>> > enough
>> > >>> > buffers, right?
>> > >>>
>> > >>> The failover if downstream can not allocate buffers is already
>> > >>> implemented FLINK-14872 [2]. There is a timeout for how long the
>> task
>> > is
>> > >>> waiting for buffer allocation. However this doesn't prevent many
>> > >>> (potentially infinitely many) deadlock/restarts cycles. IMO the
>> propper
>> > >>> solution for [1] would be 2b described in the ticket:
>> > >>>
>> > >>> > 2b. Assign extra buffers only once all of the tasks are RUNNING.
>> This
>> > >>> is a simplified version of 2a, without tracking the tasks
>> > sink-to-source.
>> > >>>
>> > >>> But that's a pre-existing problem and I don't think we have to
>> solve it
>> > >>> before implementing overdraft. I think we would need to solve it
>> only
>> > >>> before setting Integer.MAX_VALUE as the default for the overdraft.
>> > Maybe I
>> > >>> would hesitate setting the overdraft to anything more then a couple
>> of
>> > >>> buffers by default for the same reason.
>> > >>>
>> > >>> > Actually, I totally agree that we don't need a lot of buffers for
>> > >>> overdraft
>> > >>>
>> > >>> and
>> > >>>
>> > >>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
>> > >>> > When we finish this feature and after users use it, if users
>> feedback
>> > >>> > this issue we can discuss again.
>> > >>>
>> > >>> +1
>> > >>>
>> > >>> Piotrek
>> > >>>
>> > >>> [1] https://issues.apache.org/jira/browse/FLINK-13203
>> > >>> [2] https://issues.apache.org/jira/browse/FLINK-14872
>> > >>>
>> > >>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):
>> > >>>
>> > >>>> Hi everyone,
>> > >>>>
>> > >>>> I still have some questions.
>> > >>>>
>> > >>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers,
>> > will
>> > >>>> LocalBufferPool never be available? Can't process data?
>> > >>>> 2. If the overdraft buffer use the extra buffers, when the
>> downstream
>> > >>>> task inputBuffer is insufficient, it should fail to start the job,
>> and
>> > >>>> then
>> > >>>> restart? When the InputBuffer is initialized, it will apply for
>> enough
>> > >>>> buffers, right?
>> > >>>>
>> > >>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
>> > >>>> When we finish this feature and after users use it, if users
>> feedback
>> > >>>> this issue we can discuss again.
>> > >>>>
>> > >>>> Thanks
>> > >>>> fanrui
>> > >>>>
>> > >>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <
>> > dwysakow...@apache.org>
>> > >>>> wrote:
>> > >>>>
>> > >>>>> Hey all,
>> > >>>>>
>> > >>>>> I have not replied in the thread yet, but I was following the
>> > >>>>> discussion.
>> > >>>>>
>> > >>>>> Personally, I like Fanrui's and Anton's idea. As far as I
>> understand
>> > >>>>> it
>> > >>>>> the idea to distinguish between inside flatMap & outside would be
>> > >>>>> fairly
>> > >>>>> simple, but maybe slightly indirect. The checkAvailability would
>> > >>>>> remain
>> > >>>>> unchanged and it is checked always between separate invocations of
>> > the
>> > >>>>> UDF. Therefore the overdraft buffers would not apply there.
>> However
>> > >>>>> once
>> > >>>>> the pool says it is available, it means it has at least an initial
>> > >>>>> buffer. So any additional request without checking for
>> availability
>> > >>>>> can
>> > >>>>> be considered to be inside of processing a single record. This
>> does
>> > >>>>> not
>> > >>>>> hold just for the LegacySource as I don't think it actually checks
>> > for
>> > >>>>> the availability of buffers in the LocalBufferPool.
>> > >>>>>
>> > >>>>> In the offline chat with Anton, we also discussed if we need a
>> limit
>> > >>>>> of
>> > >>>>> the number of buffers we could overdraft (or in other words if the
>> > >>>>> limit
>> > >>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer
>> to
>> > >>>>> stay
>> > >>>>> on the safe side and have it limited. The pool of network buffers
>> is
>> > >>>>> shared for the entire TaskManager, so it means it can be shared
>> even
>> > >>>>> across tasks of separate jobs. However, I might be just
>> unnecessarily
>> > >>>>> cautious here.
>> > >>>>>
>> > >>>>> Best,
>> > >>>>>
>> > >>>>> Dawid
>> > >>>>>
>> > >>>>> On 04/05/2022 10:54, Piotr Nowojski wrote:
>> > >>>>> > Hi,
>> > >>>>> >
>> > >>>>> > Thanks for the answers.
>> > >>>>> >
>> > >>>>> >> we may still need to discuss whether the
>> > >>>>> >> overdraft/reserve/spare should use extra buffers or buffers
>> > >>>>> >> in (exclusive + floating buffers)?
>> > >>>>> > and
>> > >>>>> >
>> > >>>>> >> These things resolve the different problems (at least as I see
>> > >>>>> that).
>> > >>>>> >> The current hardcoded "1"  says that we switch "availability"
>> to
>> > >>>>> >> "unavailability" when one more buffer is left(actually a little
>> > less
>> > >>>>> >> than one buffer since we write the last piece of data to this
>> last
>> > >>>>> >> buffer). The overdraft feature doesn't change this logic we
>> still
>> > >>>>> want
>> > >>>>> >> to switch to "unavailability" in such a way but if we are
>> already
>> > in
>> > >>>>> >> "unavailability" and we want more buffers then we can take
>> > >>>>> "overdraft
>> > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we
>> need
>> > >>>>> to
>> > >>>>> >> understand when we should switch to "unavailability"
>> > >>>>> > Ok, I see. So it seems to me that both of you have in mind to
>> keep
>> > >>>>> the
>> > >>>>> > buffer pools as they are right now, but if we are in the middle
>> of
>> > >>>>> > processing a record, we can request extra overdraft buffers on
>> top
>> > of
>> > >>>>> > those? This is another way to implement the overdraft to what I
>> was
>> > >>>>> > thinking. I was thinking about something like keeping the
>> > >>>>> "overdraft" or
>> > >>>>> > more precisely buffer "reserve" in the buffer pool. I think my
>> > >>>>> version
>> > >>>>> > would be easier to implement, because it is just fiddling with
>> > >>>>> min/max
>> > >>>>> > buffers calculation and slightly modified `checkAvailability()`
>> > >>>>> logic.
>> > >>>>> >
>> > >>>>> > On the other hand  what you have in mind would better utilise
>> the
>> > >>>>> available
>> > >>>>> > memory, right? It would require more code changes (how would we
>> > know
>> > >>>>> when
>> > >>>>> > we are allowed to request the overdraft?). However, in this
>> case, I
>> > >>>>> would
>> > >>>>> > be tempted to set the number of overdraft buffers by default to
>> > >>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers
>> as
>> > >>>>> > necessary. The only downside that I can think of (apart of
>> higher
>> > >>>>> > complexity) would be higher chance of hitting a known/unsolved
>> > >>>>> deadlock [1]
>> > >>>>> > in a scenario:
>> > >>>>> > - downstream task hasn't yet started
>> > >>>>> > - upstream task requests overdraft and uses all available memory
>> > >>>>> segments
>> > >>>>> > from the global pool
>> > >>>>> > - upstream task is blocked, because downstream task hasn't
>> started
>> > >>>>> yet and
>> > >>>>> > can not consume any data
>> > >>>>> > - downstream task tries to start, but can not, as there are no
>> > >>>>> available
>> > >>>>> > buffers
>> > >>>>> >
>> > >>>>> >> BTW, for watermark, the number of buffers it needs is
>> > >>>>> >> numberOfSubpartitions. So if
>> > overdraftBuffers=numberOfSubpartitions,
>> > >>>>> >> the watermark won't block in requestMemory.
>> > >>>>> > and
>> > >>>>> >
>> > >>>>> >> the best overdraft size will be equal to parallelism.
>> > >>>>> > That's a lot of buffers. I don't think we need that many for
>> > >>>>> broadcasting
>> > >>>>> > watermarks. Watermarks are small, and remember that every
>> > >>>>> subpartition has
>> > >>>>> > some partially filled/empty WIP buffer, so the vast majority of
>> > >>>>> > subpartitions will not need to request a new buffer.
>> > >>>>> >
>> > >>>>> > Best,
>> > >>>>> > Piotrek
>> > >>>>> >
>> > >>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203
>> > >>>>> >
>> > >>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com>
>> > >>>>> napisał(a):
>> > >>>>> >
>> > >>>>> >> Hi,
>> > >>>>> >>
>> > >>>>> >>
>> > >>>>> >>   >> Do you mean to ignore it while processing records, but
>> keep
>> > >>>>> using
>> > >>>>> >> `maxBuffersPerChannel` when calculating the availability of the
>> > >>>>> output?
>> > >>>>> >>
>> > >>>>> >>
>> > >>>>> >> Yes, it is correct.
>> > >>>>> >>
>> > >>>>> >>
>> > >>>>> >>   >> Would it be a big issue if we changed it to check if at
>> least
>> > >>>>> >> "overdraft number of buffers are available", where "overdraft
>> > >>>>> number" is
>> > >>>>> >> configurable, instead of the currently hardcoded value of "1"?
>> > >>>>> >>
>> > >>>>> >>
>> > >>>>> >> These things resolve the different problems (at least as I see
>> > >>>>> that).
>> > >>>>> >> The current hardcoded "1"  says that we switch "availability"
>> to
>> > >>>>> >> "unavailability" when one more buffer is left(actually a little
>> > less
>> > >>>>> >> than one buffer since we write the last piece of data to this
>> last
>> > >>>>> >> buffer). The overdraft feature doesn't change this logic we
>> still
>> > >>>>> want
>> > >>>>> >> to switch to "unavailability" in such a way but if we are
>> already
>> > in
>> > >>>>> >> "unavailability" and we want more buffers then we can take
>> > >>>>> "overdraft
>> > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we
>> need
>> > >>>>> to
>> > >>>>> >> understand when we should switch to "unavailability"
>> > >>>>> >>
>> > >>>>> >>
>> > >>>>> >> -- About "reserve" vs "overdraft"
>> > >>>>> >>
>> > >>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size
>> will
>> > be
>> > >>>>> >> equal to parallelism. Also, the user can set any value he
>> wants.
>> > So
>> > >>>>> even
>> > >>>>> >> if parallelism is small(~5) but the user's flatmap produces a
>> lot
>> > of
>> > >>>>> >> data, the user can set 10 or even more. Which almost double the
>> > max
>> > >>>>> >> buffers and it will be impossible to reserve. At least we need
>> to
>> > >>>>> figure
>> > >>>>> >> out how to protect from such cases (the limit for an
>> overdraft?).
>> > So
>> > >>>>> >> actually it looks even more difficult than increasing the
>> maximum
>> > >>>>> buffers.
>> > >>>>> >>
>> > >>>>> >> I want to emphasize that overdraft buffers are soft
>> configuration
>> > >>>>> which
>> > >>>>> >> means it takes as many buffers as the global buffers pool has
>> > >>>>> >> available(maybe zero) but less than this configured value. It
>> is
>> > >>>>> also
>> > >>>>> >> important to notice that perhaps, not many subtasks in
>> TaskManager
>> > >>>>> will
>> > >>>>> >> be using this feature so we don't actually need a lot of
>> available
>> > >>>>> >> buffers for every subtask(Here, I mean that if we have only one
>> > >>>>> >> window/flatmap operator and many other operators, then one
>> > >>>>> TaskManager
>> > >>>>> >> will have many ordinary subtasks which don't actually need
>> > >>>>> overdraft and
>> > >>>>> >> several subtasks that needs this feature). But in case of
>> > >>>>> reservation,
>> > >>>>> >> we will reserve some buffers for all operators even if they
>> don't
>> > >>>>> really
>> > >>>>> >> need it.
>> > >>>>> >>
>> > >>>>> >>
>> > >>>>> >> -- Legacy source problem
>> > >>>>> >>
>> > >>>>> >> If we still want to change max buffers then it is problem for
>> > >>>>> >> LegacySources(since every subtask of source will always use
>> these
>> > >>>>> >> overdraft). But right now, I think that we can force to set 0
>> > >>>>> overdraft
>> > >>>>> >> buffers for legacy subtasks in configuration during
>> execution(if
>> > it
>> > >>>>> is
>> > >>>>> >> not too late for changing configuration in this place).
>> > >>>>> >>
>> > >>>>> >>
>> > >>>>> >> 03.05.2022 14:11, rui fan пишет:
>> > >>>>> >>> Hi
>> > >>>>> >>>
>> > >>>>> >>> Thanks for Martijn Visser and Piotrek's feedback.  I agree
>> with
>> > >>>>> >>> ignoring the legacy source, it will affect our design. User
>> > should
>> > >>>>> >>> use the new Source Api as much as possible.
>> > >>>>> >>>
>> > >>>>> >>> Hi Piotrek, we may still need to discuss whether the
>> > >>>>> >>> overdraft/reserve/spare should use extra buffers or buffers
>> > >>>>> >>> in (exclusive + floating buffers)? They have some differences.
>> > >>>>> >>>
>> > >>>>> >>> If it uses extra buffers:
>> > >>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1
>> > >>>>> >>>    <= currentPoolSize) and all subpartitions don't reach the
>> > >>>>> >>> maxBuffersPerChannel.
>> > >>>>> >>>
>> > >>>>> >>> If it uses the buffers in (exclusive + floating buffers):
>> > >>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers +
>> > >>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions
>> > >>>>> >>> don't reach the maxBuffersPerChannel.
>> > >>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8),
>> > the
>> > >>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be
>> > >>>>> >>> easily unavailable. For throughput, if users turn up the
>> > >>>>> >>> overdraft buffers, they need to turn up exclusive or floating
>> > >>>>> >>> buffers. It also affects the InputChannel, and it's is
>> unfriendly
>> > >>>>> >>> to users.
>> > >>>>> >>>
>> > >>>>> >>> So I prefer the overdraft to use extra buffers.
>> > >>>>> >>>
>> > >>>>> >>>
>> > >>>>> >>> BTW, for watermark, the number of buffers it needs is
>> > >>>>> >>> numberOfSubpartitions. So if
>> > >>>>> overdraftBuffers=numberOfSubpartitions,
>> > >>>>> >>> the watermark won't block in requestMemory. But it has
>> > >>>>> >>> 2 problems:
>> > >>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses
>> > >>>>> >>> (exclusive + floating buffers),  there will be fewer buffers
>> > >>>>> >>> available. Throughput may be affected.
>> > >>>>> >>> 2. The numberOfSubpartitions is different for each Task.
>> > >>>>> >>> So if users want to cover watermark using this feature,
>> > >>>>> >>> they don't know how to set the overdraftBuffers more r
>> > >>>>> >>> easonably. And if the parallelism is changed, users still
>> > >>>>> >>> need to change overdraftBuffers. It is unfriendly to users.
>> > >>>>> >>>
>> > >>>>> >>> So I propose we support overdraftBuffers=-1, It means
>> > >>>>> >>> we will automatically set
>> overdraftBuffers=numberOfSubpartitions
>> > >>>>> >>> in the Constructor of LocalBufferPool.
>> > >>>>> >>>
>> > >>>>> >>> Please correct me if I'm wrong.
>> > >>>>> >>>
>> > >>>>> >>> Thanks
>> > >>>>> >>> fanrui
>> > >>>>> >>>
>> > >>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <
>> > >>>>> pnowoj...@apache.org>
>> > >>>>> >> wrote:
>> > >>>>> >>>> Hi fanrui,
>> > >>>>> >>>>
>> > >>>>> >>>>> Do you mean don't add the extra buffers? We just use
>> (exclusive
>> > >>>>> >> buffers *
>> > >>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be
>> > >>>>> available
>> > >>>>> >>>> when
>> > >>>>> >>>>> (usedBuffers+overdraftBuffers <=
>> > >>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers)
>> > >>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel,
>> > >>>>> right?
>> > >>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum
>> > >>>>> number of
>> > >>>>> >> the
>> > >>>>> >>>> required buffers, just as we did when we were implementing
>> the
>> > non
>> > >>>>> >> blocking
>> > >>>>> >>>> outputs and adding availability logic to LocalBufferPool.
>> Back
>> > >>>>> then we
>> > >>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this
>> > >>>>> logic is
>> > >>>>> >>>> located
>> > >>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
>> > >>>>> >>>>
>> > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
>> > >>>>> numSubpartitions + 1;
>> > >>>>> >>>> For performance reasons, we always require at least one
>> buffer
>> > per
>> > >>>>> >>>> sub-partition. Otherwise performance falls drastically. Now
>> if
>> > we
>> > >>>>> >> require 5
>> > >>>>> >>>> overdraft buffers for output to be available, we need to have
>> > >>>>> them on
>> > >>>>> >> top
>> > >>>>> >>>> of those "one buffer per sub-partition". So the logic should
>> be
>> > >>>>> changed
>> > >>>>> >> to:
>> > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
>> > >>>>> numSubpartitions +
>> > >>>>> >>>> numOverdraftBuffers;
>> > >>>>> >>>>
>> > >>>>> >>>> Regarding increasing the number of max buffers I'm not sure.
>> As
>> > >>>>> long as
>> > >>>>> >>>> "overdraft << max number of buffers", because all buffers on
>> the
>> > >>>>> outputs
>> > >>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft
>> > >>>>> buffers,
>> > >>>>> >> and
>> > >>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of
>> > >>>>> things if
>> > >>>>> >> we
>> > >>>>> >>>> make the output available if at least one single buffer is
>> > >>>>> available or
>> > >>>>> >> at
>> > >>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So
>> > >>>>> effects of
>> > >>>>> >>>> increasing the overdraft from 1 to for example 5 should be
>> > >>>>> negligible.
>> > >>>>> >> For
>> > >>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5
>> > still
>> > >>>>> >> increases
>> > >>>>> >>>> the overdraft by only about 25%. So maybe we can keep the
>> max as
>> > >>>>> it is?
>> > >>>>> >>>>
>> > >>>>> >>>> If so, maybe we should change the name from "overdraft" to
>> > "buffer
>> > >>>>> >> reserve"
>> > >>>>> >>>> or "spare buffers"? And document it as "number of buffers
>> kept
>> > in
>> > >>>>> >> reserve
>> > >>>>> >>>> in case of flatMap/firing timers/huge records"?
>> > >>>>> >>>>
>> > >>>>> >>>> What do you think Fenrui, Anton?
>> > >>>>> >>>>
>> > >>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the
>> new
>> > >>>>> >> features,
>> > >>>>> >>>> as long as we don't brake the existing deployments too much.
>> > >>>>> >>>>
>> > >>>>> >>>> Best,
>> > >>>>> >>>> Piotrek
>> > >>>>> >>>>
>> > >>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <
>> mart...@ververica.com>
>> > >>>>> >> napisał(a):
>> > >>>>> >>>>> Hi everyone,
>> > >>>>> >>>>>
>> > >>>>> >>>>> Just wanted to chip in on the discussion of legacy sources:
>> > >>>>> IMHO, we
>> > >>>>> >>>> should
>> > >>>>> >>>>> not focus too much on improving/adding capabilities for
>> legacy
>> > >>>>> sources.
>> > >>>>> >>>> We
>> > >>>>> >>>>> want to persuade and push users to use the new Source API.
>> Yes,
>> > >>>>> this
>> > >>>>> >>>> means
>> > >>>>> >>>>> that there's work required by the end users to port any
>> custom
>> > >>>>> source
>> > >>>>> >> to
>> > >>>>> >>>>> the new interface. The benefits of the new Source API should
>> > >>>>> outweigh
>> > >>>>> >>>> this.
>> > >>>>> >>>>> Anything that we build to support multiple interfaces means
>> > >>>>> adding more
>> > >>>>> >>>>> complexity and more possibilities for bugs. Let's try to
>> make
>> > our
>> > >>>>> >> lives a
>> > >>>>> >>>>> little bit easier.
>> > >>>>> >>>>>
>> > >>>>> >>>>> Best regards,
>> > >>>>> >>>>>
>> > >>>>> >>>>> Martijn Visser
>> > >>>>> >>>>> https://twitter.com/MartijnVisser82
>> > >>>>> >>>>> https://github.com/MartijnVisser
>> > >>>>> >>>>>
>> > >>>>> >>>>>
>> > >>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com>
>> > >>>>> wrote:
>> > >>>>> >>>>>
>> > >>>>> >>>>>> Hi Piotrek
>> > >>>>> >>>>>>
>> > >>>>> >>>>>>> Do you mean to ignore it while processing records, but
>> keep
>> > >>>>> using
>> > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability
>> of
>> > the
>> > >>>>> >>>> output?
>> > >>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double
>> > check.
>> > >>>>> >>>>>>
>> > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is
>> it a
>> > >>>>> big
>> > >>>>> >>>>> problem
>> > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we
>> > already
>> > >>>>> have
>> > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
>> > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a
>> > single
>> > >>>>> >>>> buffer
>> > >>>>> >>>>>>> available and this works the same for all tasks (including
>> > >>>>> legacy
>> > >>>>> >>>>> source
>> > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check
>> if
>> > >>>>> at least
>> > >>>>> >>>>>>> "overdraft number of buffers are available", where
>> "overdraft
>> > >>>>> number"
>> > >>>>> >>>>> is
>> > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of
>> > "1"?
>> > >>>>> >>>>>> Do you mean don't add the extra buffers? We just use
>> > (exclusive
>> > >>>>> >>>> buffers *
>> > >>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will
>> be
>> > >>>>> available
>> > >>>>> >>>>> when
>> > >>>>> >>>>>> (usedBuffers+overdraftBuffers <=
>> > >>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers)
>> > >>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel,
>> > >>>>> right?
>> > >>>>> >>>>>>
>> > >>>>> >>>>>> If yes, I think it can solve the problem of legacy source.
>> > >>>>> There may
>> > >>>>> >> be
>> > >>>>> >>>>>> some impact. If overdraftBuffers is large and only one
>> buffer
>> > >>>>> is used
>> > >>>>> >>>> to
>> > >>>>> >>>>>> process a single record, exclusive buffers*parallelism +
>> > >>>>> floating
>> > >>>>> >>>> buffers
>> > >>>>> >>>>>> cannot be used. It may only be possible to use (exclusive
>> > >>>>> buffers *
>> > >>>>> >>>>>> parallelism
>> > >>>>> >>>>>> + floating buffers - overdraft buffers + 1). For
>> throughput,
>> > if
>> > >>>>> turn
>> > >>>>> >> up
>> > >>>>> >>>>> the
>> > >>>>> >>>>>> overdraft buffers, the flink user needs to turn up
>> exclusive
>> > or
>> > >>>>> >>>> floating
>> > >>>>> >>>>>> buffers. And it also affects the InputChannel.
>> > >>>>> >>>>>>
>> > >>>>> >>>>>> If not, I don't think it can solve the problem of legacy
>> > >>>>> source. The
>> > >>>>> >>>>> legacy
>> > >>>>> >>>>>> source don't check isAvailable, If there are the extra
>> > buffers,
>> > >>>>> legacy
>> > >>>>> >>>>>> source
>> > >>>>> >>>>>> will use them up until block in requestMemory.
>> > >>>>> >>>>>>
>> > >>>>> >>>>>>
>> > >>>>> >>>>>> Thanks
>> > >>>>> >>>>>> fanrui
>> > >>>>> >>>>>>
>> > >>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <
>> > >>>>> pnowoj...@apache.org>
>> > >>>>> >>>>>> wrote:
>> > >>>>> >>>>>>
>> > >>>>> >>>>>>> Hi,
>> > >>>>> >>>>>>>
>> > >>>>> >>>>>>> +1 for the general proposal from my side. It would be a
>> nice
>> > >>>>> >>>> workaround
>> > >>>>> >>>>>>> flatMaps, WindowOperators and large records issues with
>> > >>>>> unaligned
>> > >>>>> >>>>>>> checkpoints.
>> > >>>>> >>>>>>>
>> > >>>>> >>>>>>>> The first task is about ignoring max buffers per channel.
>> > This
>> > >>>>> >>>> means
>> > >>>>> >>>>> if
>> > >>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the
>> > >>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just
>> > >>>>> ignore
>> > >>>>> >>>> that
>> > >>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has
>> > >>>>> it(it is
>> > >>>>> >>>>>>>> actually not a overdraft).
>> > >>>>> >>>>>>> Do you mean to ignore it while processing records, but
>> keep
>> > >>>>> using
>> > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability
>> of
>> > the
>> > >>>>> >>>> output?
>> > >>>>> >>>>>>>> The second task is about the real overdraft. I am pretty
>> > >>>>> convinced
>> > >>>>> >>>>> now
>> > >>>>> >>>>>>>> that we, unfortunately, need configuration for
>> limitation of
>> > >>>>> >>>>> overdraft
>> > >>>>> >>>>>>>> number(because it is not ok if one subtask allocates all
>> > >>>>> buffers of
>> > >>>>> >>>>> one
>> > >>>>> >>>>>>>> TaskManager considering that several different jobs can
>> be
>> > >>>>> >>>> submitted
>> > >>>>> >>>>> on
>> > >>>>> >>>>>>>> this TaskManager). So idea is to have
>> > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>> > >>>>> >>>>>> LocalBufferPool).
>> > >>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool
>> is
>> > >>>>> >>>> reached,
>> > >>>>> >>>>>>>> LocalBufferPool can request additionally from
>> > >>>>> NetworkBufferPool up
>> > >>>>> >>>> to
>> > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers.
>> > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is
>> it a
>> > >>>>> big
>> > >>>>> >>>>> problem
>> > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we
>> > already
>> > >>>>> have
>> > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer.
>> > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a
>> > single
>> > >>>>> >>>> buffer
>> > >>>>> >>>>>>> available and this works the same for all tasks (including
>> > >>>>> legacy
>> > >>>>> >>>>> source
>> > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check
>> if
>> > >>>>> at least
>> > >>>>> >>>>>>> "overdraft number of buffers are available", where
>> "overdraft
>> > >>>>> number"
>> > >>>>> >>>>> is
>> > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of
>> > "1"?
>> > >>>>> >>>>>>>
>> > >>>>> >>>>>>> Best,
>> > >>>>> >>>>>>> Piotrek
>> > >>>>> >>>>>>>
>> > >>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com>
>> > >>>>> napisał(a):
>> > >>>>> >>>>>>>
>> > >>>>> >>>>>>>> Let me add some information about the LegacySource.
>> > >>>>> >>>>>>>>
>> > >>>>> >>>>>>>> If we want to disable the overdraft buffer for
>> LegacySource.
>> > >>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
>> > >>>>> >>>>>>>> The default value is false. If the getAvailableFuture is
>> > >>>>> called,
>> > >>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there
>> are
>> > >>>>> >>>>>>>> checks isAvailable elsewhere.
>> > >>>>> >>>>>>>>
>> > >>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please
>> correct
>> > me
>> > >>>>> if
>> > >>>>> >>>> I'm
>> > >>>>> >>>>>>> wrong.
>> > >>>>> >>>>>>>> Thanks
>> > >>>>> >>>>>>>> fanrui
>> > >>>>> >>>>>>>>
>> > >>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <
>> > >>>>> 1996fan...@gmail.com>
>> > >>>>> >>>>> wrote:
>> > >>>>> >>>>>>>>> Hi,
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>> Thanks for your quick response.
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just
>> need
>> > to
>> > >>>>> >>>>> discuss
>> > >>>>> >>>>>>> the
>> > >>>>> >>>>>>>>> default value in PR.
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for
>> > >>>>> general
>> > >>>>> >>>>>>>>> implementation.
>> > >>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable()
>> in
>> > >>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common
>> > >>>>> >>>>> LegacySource,
>> > >>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs
>> consume
>> > >>>>> >>>> kafka,
>> > >>>>> >>>>> so
>> > >>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems.
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>> Core code:
>> > >>>>> >>>>>>>>> ```
>> > >>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() {
>> > >>>>> >>>>>>>>>        if (recordWriter == null
>> > >>>>> >>>>>>>>>             ||
>> > >>>>> >>>>>>>>>
>> > >>>>> >>
>> > >>>>>
>> >
>> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
>> > >>>>> >>>>>>>>> false)
>> > >>>>> >>>>>>>>>             || recordWriter.isAvailable()) {
>> > >>>>> >>>>>>>>>             return;
>> > >>>>> >>>>>>>>>        }
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>>        CompletableFuture<?> resumeFuture =
>> > >>>>> >>>>>>>> recordWriter.getAvailableFuture();
>> > >>>>> >>>>>>>>>        try {
>> > >>>>> >>>>>>>>>             resumeFuture.get();
>> > >>>>> >>>>>>>>>        } catch (Throwable ignored) {
>> > >>>>> >>>>>>>>>        }
>> > >>>>> >>>>>>>>> }
>> > >>>>> >>>>>>>>> ```
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>> LegacySource calls
>> > >>>>> sourceContext.ensureRecordWriterIsAvailable()
>> > >>>>> >>>>>>>>> before synchronized (checkpointLock) and collects
>> records.
>> > >>>>> >>>>>>>>> Please let me know if there is a better solution.
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>> Thanks
>> > >>>>> >>>>>>>>> fanrui
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
>> > >>>>> >>>>>> kaa....@yandex.com>
>> > >>>>> >>>>>>>>> wrote:
>> > >>>>> >>>>>>>>>
>> > >>>>> >>>>>>>>>> Hi.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs
>> or
>> > >>>>> two
>> > >>>>> >>>>>> commits
>> > >>>>> >>>>>>>> in a
>> > >>>>> >>>>>>>>>>       PR?
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this
>> > >>>>> task has
>> > >>>>> >>>>>> fewer
>> > >>>>> >>>>>>>>>> questions but we should find a solution for
>> LegacySource
>> > >>>>> first.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> --  2. For the first task, if the flink user disables
>> the
>> > >>>>> >>>>> Unaligned
>> > >>>>> >>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
>> > >>>>> Because
>> > >>>>> >>>> the
>> > >>>>> >>>>>>>>>> overdraft
>> > >>>>> >>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
>> > >>>>> needs to
>> > >>>>> >>>>> wait
>> > >>>>> >>>>>>> for
>> > >>>>> >>>>>>>>>>       downstream Task to consume.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> I think that the logic should be the same for AC and
>> UC.
>> > As
>> > >>>>> I
>> > >>>>> >>>>>>>> understand,
>> > >>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it
>> > >>>>> doesn't
>> > >>>>> >>>>> make
>> > >>>>> >>>>>>> it
>> > >>>>> >>>>>>>>>> worse as well.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>>     3. For the second task
>> > >>>>> >>>>>>>>>> --      - The default value of
>> > >>>>> maxOverdraftBuffersPerPartition
>> > >>>>> >>>> may
>> > >>>>> >>>>>>> also
>> > >>>>> >>>>>>>>>> need
>> > >>>>> >>>>>>>>>>          to be discussed.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> I think it should be a pretty small value or even 0
>> since
>> > it
>> > >>>>> >>>> kind
>> > >>>>> >>>>> of
>> > >>>>> >>>>>>>>>> optimization and user should understand what they
>> > >>>>> do(especially
>> > >>>>> >>>> if
>> > >>>>> >>>>>> we
>> > >>>>> >>>>>>>>>> implement the first task).
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> --      - If the user disables the Unaligned
>> Checkpoint,
>> > >>>>> can we
>> > >>>>> >>>>> set
>> > >>>>> >>>>>>> the
>> > >>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>> > >>>>> overdraft
>> > >>>>> >>>>>> isn't
>> > >>>>> >>>>>>>>>> useful for
>> > >>>>> >>>>>>>>>>          the Aligned Checkpoint.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't
>> make
>> > >>>>> >>>>>> degradation
>> > >>>>> >>>>>>>> for
>> > >>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should
>> make
>> > >>>>> >>>>> difference
>> > >>>>> >>>>>>>> between
>> > >>>>> >>>>>>>>>> AC and UC.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>>       4. For the legacy source
>> > >>>>> >>>>>>>>>> --      - If enabling the Unaligned Checkpoint, it
>> uses up
>> > >>>>> to
>> > >>>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>> > >>>>> >>>>>>>>>>          - If disabling the UC, it doesn't use the
>> > overdraft
>> > >>>>> >>>> buffer.
>> > >>>>> >>>>>>>>>>          - Do you think it's ok?
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for
>> LegacySource at
>> > >>>>> all
>> > >>>>> >>>>> since
>> > >>>>> >>>>>>> it
>> > >>>>> >>>>>>>>>> can lead to undesirable results especially if the
>> limit is
>> > >>>>> high.
>> > >>>>> >>>>> At
>> > >>>>> >>>>>>>> least,
>> > >>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode
>> and
>> > >>>>> it
>> > >>>>> >>>> will
>> > >>>>> >>>>>>>> borrow
>> > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global
>> > pool
>> > >>>>> >>>> which
>> > >>>>> >>>>>> can
>> > >>>>> >>>>>>>> lead
>> > >>>>> >>>>>>>>>> to degradation of other subtasks on the same
>> TaskManager.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> --      - Actually, we added the checkAvailable logic
>> for
>> > >>>>> >>>>>> LegacySource
>> > >>>>> >>>>>>>> in
>> > >>>>> >>>>>>>>>> our
>> > >>>>> >>>>>>>>>>          internal version. It works well.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> I don't really understand how it is possible for
>> general
>> > >>>>> case
>> > >>>>> >>>>>>>> considering
>> > >>>>> >>>>>>>>>> that each user has their own implementation of
>> > >>>>> >>>>> LegacySourceOperator
>> > >>>>> >>>>>>>>>> --   5. For the benchmark, do you have any
>> suggestions? I
>> > >>>>> >>>>> submitted
>> > >>>>> >>>>>>> the
>> > >>>>> >>>>>>>> PR
>> > >>>>> >>>>>>>>>>       [1].
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon.
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
>> > >>>>> >>>>>>>>>>> Hi,
>> > >>>>> >>>>>>>>>>>
>> > >>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of
>> questions.
>> > >>>>> >>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>       1. Do you mean split this into two JIRAs or two
>> PRs
>> > >>>>> or two
>> > >>>>> >>>>>>> commits
>> > >>>>> >>>>>>>>>> in a
>> > >>>>> >>>>>>>>>>>       PR?
>> > >>>>> >>>>>>>>>>>       2. For the first task, if the flink user
>> disables
>> > the
>> > >>>>> >>>>>> Unaligned
>> > >>>>> >>>>>>>>>>>       Checkpoint, do we ignore max buffers per
>> channel?
>> > >>>>> Because
>> > >>>>> >>>>> the
>> > >>>>> >>>>>>>>>> overdraft
>> > >>>>> >>>>>>>>>>>       isn't useful for the Aligned Checkpoint, it
>> still
>> > >>>>> needs to
>> > >>>>> >>>>>> wait
>> > >>>>> >>>>>>>> for
>> > >>>>> >>>>>>>>>>>       downstream Task to consume.
>> > >>>>> >>>>>>>>>>>       3. For the second task
>> > >>>>> >>>>>>>>>>>          - The default value of
>> > >>>>> maxOverdraftBuffersPerPartition
>> > >>>>> >>>>> may
>> > >>>>> >>>>>>> also
>> > >>>>> >>>>>>>>>> need
>> > >>>>> >>>>>>>>>>>          to be discussed.
>> > >>>>> >>>>>>>>>>>          - If the user disables the Unaligned
>> Checkpoint,
>> > >>>>> can we
>> > >>>>> >>>>> set
>> > >>>>> >>>>>>> the
>> > >>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because
>> the
>> > >>>>> >>>> overdraft
>> > >>>>> >>>>>>> isn't
>> > >>>>> >>>>>>>>>> useful for
>> > >>>>> >>>>>>>>>>>          the Aligned Checkpoint.
>> > >>>>> >>>>>>>>>>>       4. For the legacy source
>> > >>>>> >>>>>>>>>>>          - If enabling the Unaligned Checkpoint, it
>> uses
>> > >>>>> up to
>> > >>>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>> > >>>>> >>>>>>>>>>>          - If disabling the UC, it doesn't use the
>> > >>>>> overdraft
>> > >>>>> >>>>> buffer.
>> > >>>>> >>>>>>>>>>>          - Do you think it's ok?
>> > >>>>> >>>>>>>>>>>          - Actually, we added the checkAvailable logic
>> > for
>> > >>>>> >>>>>>> LegacySource
>> > >>>>> >>>>>>>>>> in our
>> > >>>>> >>>>>>>>>>>          internal version. It works well.
>> > >>>>> >>>>>>>>>>>       5. For the benchmark, do you have any
>> suggestions?
>> > I
>> > >>>>> >>>>> submitted
>> > >>>>> >>>>>>> the
>> > >>>>> >>>>>>>>>> PR
>> > >>>>> >>>>>>>>>>>       [1].
>> > >>>>> >>>>>>>>>>>
>> > >>>>> >>>>>>>>>>> [1]
>> https://github.com/apache/flink-benchmarks/pull/54
>> > >>>>> >>>>>>>>>>>
>> > >>>>> >>>>>>>>>>> Thanks
>> > >>>>> >>>>>>>>>>> fanrui
>> > >>>>> >>>>>>>>>>>
>> > >>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
>> > >>>>> >>>>>>> kaa....@yandex.com
>> > >>>>> >>>>>>>>>>> wrote:
>> > >>>>> >>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>> Hi,
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz.
>> Here
>> > >>>>> is
>> > >>>>> >>>>> some
>> > >>>>> >>>>>>>>>>>> conclusion:
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>> First of all, let's split this into two tasks.
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per
>> > channel.
>> > >>>>> >>>> This
>> > >>>>> >>>>>>> means
>> > >>>>> >>>>>>>> if
>> > >>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and
>> the
>> > >>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we
>> > just
>> > >>>>> >>>>> ignore
>> > >>>>> >>>>>>> that
>> > >>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool
>> > has
>> > >>>>> >>>> it(it
>> > >>>>> >>>>>> is
>> > >>>>> >>>>>>>>>>>> actually not a overdraft).
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am
>> pretty
>> > >>>>> >>>>>> convinced
>> > >>>>> >>>>>>>> now
>> > >>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for
>> > limitation
>> > >>>>> of
>> > >>>>> >>>>>>>> overdraft
>> > >>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates
>> all
>> > >>>>> >>>>> buffers
>> > >>>>> >>>>>> of
>> > >>>>> >>>>>>>> one
>> > >>>>> >>>>>>>>>>>> TaskManager considering that several different jobs
>> can
>> > be
>> > >>>>> >>>>>>> submitted
>> > >>>>> >>>>>>>> on
>> > >>>>> >>>>>>>>>>>> this TaskManager). So idea is to have
>> > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say
>> per
>> > >>>>> >>>>>>>>>> LocalBufferPool).
>> > >>>>> >>>>>>>>>>>> In this case, when a limit of buffers in
>> LocalBufferPool
>> > >>>>> is
>> > >>>>> >>>>>>> reached,
>> > >>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from
>> > >>>>> >>>> NetworkBufferPool
>> > >>>>> >>>>>> up
>> > >>>>> >>>>>>> to
>> > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource
>> > >>>>> since it
>> > >>>>> >>>>>>>> actually
>> > >>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in
>> > >>>>> >>>> overdraft
>> > >>>>> >>>>>>> mode
>> > >>>>> >>>>>>>>>>>> which is not a target. So we still need to think
>> about
>> > >>>>> that.
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>      29.04.2022 11:11, rui fan пишет:
>> > >>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov,
>> > >>>>> >>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum
>> > >>>>> number of
>> > >>>>> >>>>>>>> overdraft
>> > >>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for,
>> > right?
>> > >>>>> >>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to
>> > >>>>> don't
>> > >>>>> >>>> add
>> > >>>>> >>>>>> the
>> > >>>>> >>>>>>>> new
>> > >>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the
>> > >>>>> community.
>> > >>>>> >>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>> Best wishes
>> > >>>>> >>>>>>>>>>>>> fanrui
>> > >>>>> >>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
>> > >>>>> >>>>> 1996fan...@gmail.com>
>> > >>>>> >>>>>>>>>> wrote:
>> > >>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov,
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are
>> > >>>>> totally
>> > >>>>> >>>>>> right.
>> > >>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be
>> > used
>> > >>>>> as
>> > >>>>> >>>>> the
>> > >>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
>> > >>>>> >>>>>>> configuration.Flink
>> > >>>>> >>>>>>>>>> users
>> > >>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the
>> > >>>>> overdraft
>> > >>>>> >>>>>> buffer
>> > >>>>> >>>>>>>>>> size.
>> > >>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For
>> safety, we
>> > >>>>> >>>> should
>> > >>>>> >>>>>>> limit
>> > >>>>> >>>>>>>>>> the
>> > >>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each
>> > >>>>> >>>>> LocalBufferPool
>> > >>>>> >>>>>>>>>>>>>> can apply for.
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>> Why do we limit it?
>> > >>>>> >>>>>>>>>>>>>> Some operators don't check the
>> > >>>>> `recordWriter.isAvailable`
>> > >>>>> >>>>>> during
>> > >>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have
>> > >>>>> mentioned
>> > >>>>> >>>> it
>> > >>>>> >>>>>> in
>> > >>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other
>> > cases.
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will
>> use
>> > >>>>> up
>> > >>>>> >>>> all
>> > >>>>> >>>>>>>>>> remaining
>> > >>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the
>> backpressure
>> > is
>> > >>>>> >>>>>> severe.
>> > >>>>> >>>>>>>>>>>>>> How to limit it?
>> > >>>>> >>>>>>>>>>>>>> I prefer to hard code the
>> > >>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
>> > >>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
>> > >>>>> >>>>> maxOverdraftBuffers
>> > >>>>> >>>>>> is
>> > >>>>> >>>>>>>>>> just
>> > >>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink
>> > >>>>> jobs. Or
>> > >>>>> >>>>> we
>> > >>>>> >>>>>>> can
>> > >>>>> >>>>>>>>>> set
>> > >>>>> >>>>>>>>>>>>>>
>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions,
>> > >>>>> 10)`
>> > >>>>> >>>> to
>> > >>>>> >>>>>>> handle
>> > >>>>> >>>>>>>>>>>>>> some jobs of low parallelism.
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned
>> Checkpoint, we
>> > >>>>> can
>> > >>>>> >>>>> set
>> > >>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
>> > >>>>> >>>> LocalBufferPool.
>> > >>>>> >>>>>>>> Because
>> > >>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned
>> Checkpoint.
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>> [1]
>> https://issues.apache.org/jira/browse/FLINK-26759
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>> Best wishes
>> > >>>>> >>>>>>>>>>>>>> fanrui
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov
>> <
>> > >>>>> >>>>>>>>>> kaa....@yandex.com>
>> > >>>>> >>>>>>>>>>>>>> wrote:
>> > >>>>> >>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> Hi fanrui,
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP.
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea
>> and it
>> > >>>>> >>>> should
>> > >>>>> >>>>>>> help
>> > >>>>> >>>>>>>> in
>> > >>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about
>> > >>>>> >>>>>> configuration:
>> > >>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I
>> understand
>> > >>>>> right
>> > >>>>> >>>>> now
>> > >>>>> >>>>>>> we
>> > >>>>> >>>>>>>>>> have
>> > >>>>> >>>>>>>>>>>>>>> following calculation.
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
>> > >>>>> >>>>> memory(calculated
>> > >>>>> >>>>>>> via
>> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
>> > >>>>> >>>>>>>> taskmanager.memory.network.min,
>> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory
>> > size) /
>> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size.
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) =
>> (exclusive
>> > >>>>> >>>> buffers
>> > >>>>> >>>>> *
>> > >>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number
>> in
>> > >>>>> >>>>>> TaskManager
>> > >>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which
>> > used
>> > >>>>> at
>> > >>>>> >>>>>>> current
>> > >>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to
>> > >>>>> >>>>>>> maxBuffersNumber
>> > >>>>> >>>>>>>>>> which
>> > >>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if
>> > >>>>> >>>> requiredBuffersNumber
>> > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not
>> > >>>>> good)
>> > >>>>> >>>>> since
>> > >>>>> >>>>>>> not
>> > >>>>> >>>>>>>>>> all
>> > >>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if
>> > >>>>> Flink
>> > >>>>> >>>> can
>> > >>>>> >>>>>> not
>> > >>>>> >>>>>>>>>>>>>>> allocate floating buffers)
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber,
>> as I
>> > >>>>> >>>>>> understand
>> > >>>>> >>>>>>>>>> Flink
>> > >>>>> >>>>>>>>>>>>>>> just never use these leftovers
>> > >>>>> buffers(maxBuffersNumber -
>> > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. (
>> we
>> > >>>>> can
>> > >>>>> >>>>>> actualy
>> > >>>>> >>>>>>>> use
>> > >>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
>> > >>>>> >>>> buffersInUseNumber'
>> > >>>>> >>>>>>> since
>> > >>>>> >>>>>>>>>> if
>> > >>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators
>> including
>> > >>>>> >>>>> 'window'
>> > >>>>> >>>>>>>> which
>> > >>>>> >>>>>>>>>> can
>> > >>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool).
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to
>> > >>>>> >>>> requesting
>> > >>>>> >>>>>>>> buffers
>> > >>>>> >>>>>>>>>>>>>>> during processing single record while switching to
>> > >>>>> >>>>>> unavalability
>> > >>>>> >>>>>>>>>>>> between
>> > >>>>> >>>>>>>>>>>>>>> records should be the same as we have it now):
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> * If one more buffer requested but
>> > maxBuffersPerChannel
>> > >>>>> >>>>>> reached,
>> > >>>>> >>>>>>>>>> then
>> > >>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this
>> buffers
>> > >>>>> from
>> > >>>>> >>>>> any
>> > >>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet
>> > >>>>> >>>> otherwise
>> > >>>>> >>>>>>> from
>> > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool)
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then
>> temporally
>> > >>>>> >>>> allocate
>> > >>>>> >>>>>> it
>> > >>>>> >>>>>>>> from
>> > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to
>> allocate
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't
>> > work,
>> > >>>>> >>>> but I
>> > >>>>> >>>>>>> like
>> > >>>>> >>>>>>>> it
>> > >>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch
>> > without
>> > >>>>> >>>> any
>> > >>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be
>> > >>>>> configuration
>> > >>>>> >>>> by
>> > >>>>> >>>>>>>>>> changing
>> > >>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and
>> > >>>>> requiredBuffersNumber.
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really
>> > want
>> > >>>>> to
>> > >>>>> >>>>>>>> implement
>> > >>>>> >>>>>>>>>> new
>> > >>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how
>> to
>> > >>>>> >>>>> correctly
>> > >>>>> >>>>>>>>>> configure
>> > >>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I
>> > don't
>> > >>>>> >>>> want
>> > >>>>> >>>>>> to
>> > >>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible
>> to
>> > >>>>> >>>> resolve
>> > >>>>> >>>>>> the
>> > >>>>> >>>>>>>>>> problem
>> > >>>>> >>>>>>>>>>>>>>> automatically(as described above).
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> So is my understanding about network
>> memory/buffers
>> > >>>>> >>>> correct?
>> > >>>>> >>>>>>>>>>>>>>> --
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> Best regards,
>> > >>>>> >>>>>>>>>>>>>>> Anton Kalashnikov
>> > >>>>> >>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
>> > >>>>> >>>>>>>>>>>>>>>> Hi everyone,
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major
>> > feature
>> > >>>>> of
>> > >>>>> >>>>>> Flink.
>> > >>>>> >>>>>>>> It
>> > >>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint
>> timeout
>> > >>>>> or
>> > >>>>> >>>>> slow
>> > >>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not
>> work
>> > >>>>> well
>> > >>>>> >>>>>> when
>> > >>>>> >>>>>>>> the
>> > >>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output
>> buffers
>> > >>>>> are
>> > >>>>> >>>>>>> required
>> > >>>>> >>>>>>>> to
>> > >>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also
>> > >>>>> mentioned
>> > >>>>> >>>>> this
>> > >>>>> >>>>>>>> issue
>> > >>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to
>> solve
>> > >>>>> it.
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to
>> detail
>> > the
>> > >>>>> >>>>>>> overdraft
>> > >>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
>> > >>>>> >>>> Kalashnikov,
>> > >>>>> >>>>>>> there
>> > >>>>> >>>>>>>>>> are
>> > >>>>> >>>>>>>>>>>>>>>> still some points to discuss:
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>>       * There are already a lot of buffer-related
>> > >>>>> >>>>>> configurations.
>> > >>>>> >>>>>>>> Do
>> > >>>>> >>>>>>>>>> we
>> > >>>>> >>>>>>>>>>>>>>>>         need to add a new configuration for the
>> > >>>>> overdraft
>> > >>>>> >>>>>> buffer?
>> > >>>>> >>>>>>>>>>>>>>>>       * Where should the overdraft buffer use
>> > memory?
>> > >>>>> >>>>>>>>>>>>>>>>       * If the overdraft-buffer uses the memory
>> > >>>>> remaining
>> > >>>>> >>>> in
>> > >>>>> >>>>>> the
>> > >>>>> >>>>>>>>>>>>>>>>         NetworkBufferPool, no new configuration
>> > needs
>> > >>>>> to be
>> > >>>>> >>>>>>> added.
>> > >>>>> >>>>>>>>>>>>>>>>       * If adding a new configuration:
>> > >>>>> >>>>>>>>>>>>>>>>           o Should we set the
>> overdraft-memory-size
>> > >>>>> at the
>> > >>>>> >>>> TM
>> > >>>>> >>>>>>> level
>> > >>>>> >>>>>>>>>> or
>> > >>>>> >>>>>>>>>>>> the
>> > >>>>> >>>>>>>>>>>>>>>>             Task level?
>> > >>>>> >>>>>>>>>>>>>>>>           o Or set overdraft-buffers to indicate
>> the
>> > >>>>> number
>> > >>>>> >>>>> of
>> > >>>>> >>>>>>>>>>>>>>>>             memory-segments that can be
>> overdrawn.
>> > >>>>> >>>>>>>>>>>>>>>>           o What is the default value? How to set
>> > >>>>> sensible
>> > >>>>> >>>>>>>> defaults?
>> > >>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified
>> it
>> > >>>>> using
>> > >>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets
>> overdraft-buffers
>> > >>>>> at
>> > >>>>> >>>>> Task
>> > >>>>> >>>>>>>> level,
>> > >>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each
>> > LocalBufferPool
>> > >>>>> >>>> can
>> > >>>>> >>>>>>>>>> overdraw up
>> > >>>>> >>>>>>>>>>>>>>>> to 10 memory-segments.
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback!
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>> Thanks,
>> > >>>>> >>>>>>>>>>>>>>>> fanrui
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>>>>> [1]
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>
>> > >>>>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>> > >>>>> >>>>>>>>>>>>>>>> [2]
>> > https://issues.apache.org/jira/browse/FLINK-14396
>> > >>>>> >>>>>>>>>>>>>>>> [3]
>> > https://issues.apache.org/jira/browse/FLINK-26762
>> > >>>>> >>>>>>>>>>>>>>>> [4]
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>
>> > >>>>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>> > >>>>> >>>>>>>>>>>>>>>> [5]
>> > >>>>> >>>>>>>>>>>>>>>>
>> > >>>>> >>
>> > >>>>>
>> >
>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
>> > >>>>> >>>>>>>>>>>>>>>> [6]
>> > >>>>> https://github.com/apache/flink-benchmarks/pull/54
>> > >>>>> >>>>>>>>>>>> --
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>> Best regards,
>> > >>>>> >>>>>>>>>>>> Anton Kalashnikov
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>>>>
>> > >>>>> >>>>>>>>>> --
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>> Best regards,
>> > >>>>> >>>>>>>>>> Anton Kalashnikov
>> > >>>>> >>>>>>>>>>
>> > >>>>> >>>>>>>>>>
>> > >>>>> >> --
>> > >>>>> >>
>> > >>>>> >> Best regards,
>> > >>>>> >> Anton Kalashnikov
>> > >>>>> >>
>> > >>>>> >>
>> > >>>>>
>> > >>>>
>> >
>>
>

Reply via email to