Hi,

Thanks a lot for your discussion.

After several discussions, I think it's clear now. I updated the
"Proposed Changes" of FLIP-227[1]. If I have something
missing, please help to add it to FLIP, or add it in the mail
and I can add it to FLIP. If everything is OK, I will create a
new JIRA for the first task, and use FLINK-26762[2] as the
second task.

About the legacy source, do we set maxOverdraftBuffersPerGate=0
directly? How to identify legacySource? Or could we add
the overdraftEnabled in LocalBufferPool? The default value
is false. If the getAvailableFuture is called, change
overdraftEnabled=true.
It indicates whether there are checks isAvailable elsewhere.
It might be more general, it can cover more cases.

Also, I think the default value of 'max-overdraft-buffers-per-gate'
needs to be confirmed. I prefer it to be between 5 and 10. How
do you think?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
[2] https://issues.apache.org/jira/browse/FLINK-26762

Thanks
fanrui

On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi again,
>
> After sleeping over this, if both versions (reserve and overdraft) have
> the same complexity, I would also prefer the overdraft.
>
> > `Integer.MAX_VALUE` as default value was my idea as well but now, as
> > Dawid mentioned, I think it is dangerous since it is too implicit for
> > the user and if the user submits one more job for the same TaskManger
>
> As I mentioned, it's not only an issue with multiple jobs. The same
> problem can happen with different subtasks from the same job, potentially
> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I would be
> in favour of Integer.MAX_VALUE to be the default value, but as it is, I
> think we should indeed play on the safe side and limit it.
>
> > I still don't understand how should be limited "reserve" implementation.
> > I mean if we have X buffers in total and the user sets overdraft equal
> > to X we obviously can not reserve all buffers, but how many we are
> > allowed to reserve? Should it be a different configuration like
> > percentegeForReservedBuffers?
>
> The reserve could be defined as percentage, or as a fixed number of
> buffers. But yes. In normal operation subtask would not use the reserve, as
> if numberOfAvailableBuffers < reserve, the output would be not available.
> Only in the flatMap/timers/huge records case the reserve could be used.
>
> > 1. If the total buffers of LocalBufferPool <= the reserve buffers, will
> LocalBufferPool never be available? Can't process data?
>
> Of course we would need to make sure that never happens. So the reserve
> should be < total buffer size.
>
> > 2. If the overdraft buffer use the extra buffers, when the downstream
> > task inputBuffer is insufficient, it should fail to start the job, and
> then
> > restart? When the InputBuffer is initialized, it will apply for enough
> > buffers, right?
>
> The failover if downstream can not allocate buffers is already implemented
> FLINK-14872 [2]. There is a timeout for how long the task is waiting for
> buffer allocation. However this doesn't prevent many (potentially
> infinitely many) deadlock/restarts cycles. IMO the propper solution for [1]
> would be 2b described in the ticket:
>
> > 2b. Assign extra buffers only once all of the tasks are RUNNING. This is
> a simplified version of 2a, without tracking the tasks sink-to-source.
>
> But that's a pre-existing problem and I don't think we have to solve it
> before implementing overdraft. I think we would need to solve it only
> before setting Integer.MAX_VALUE as the default for the overdraft. Maybe I
> would hesitate setting the overdraft to anything more then a couple of
> buffers by default for the same reason.
>
> > Actually, I totally agree that we don't need a lot of buffers for
> overdraft
>
> and
>
> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
> > When we finish this feature and after users use it, if users feedback
> > this issue we can discuss again.
>
> +1
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-13203
> [2] https://issues.apache.org/jira/browse/FLINK-14872
>
> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):
>
>> Hi everyone,
>>
>> I still have some questions.
>>
>> 1. If the total buffers of LocalBufferPool <= the reserve buffers, will
>> LocalBufferPool never be available? Can't process data?
>> 2. If the overdraft buffer use the extra buffers, when the downstream
>> task inputBuffer is insufficient, it should fail to start the job, and
>> then
>> restart? When the InputBuffer is initialized, it will apply for enough
>> buffers, right?
>>
>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
>> When we finish this feature and after users use it, if users feedback
>> this issue we can discuss again.
>>
>> Thanks
>> fanrui
>>
>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>
>>> Hey all,
>>>
>>> I have not replied in the thread yet, but I was following the discussion.
>>>
>>> Personally, I like Fanrui's and Anton's idea. As far as I understand it
>>> the idea to distinguish between inside flatMap & outside would be fairly
>>> simple, but maybe slightly indirect. The checkAvailability would remain
>>> unchanged and it is checked always between separate invocations of the
>>> UDF. Therefore the overdraft buffers would not apply there. However once
>>> the pool says it is available, it means it has at least an initial
>>> buffer. So any additional request without checking for availability can
>>> be considered to be inside of processing a single record. This does not
>>> hold just for the LegacySource as I don't think it actually checks for
>>> the availability of buffers in the LocalBufferPool.
>>>
>>> In the offline chat with Anton, we also discussed if we need a limit of
>>> the number of buffers we could overdraft (or in other words if the limit
>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to stay
>>> on the safe side and have it limited. The pool of network buffers is
>>> shared for the entire TaskManager, so it means it can be shared even
>>> across tasks of separate jobs. However, I might be just unnecessarily
>>> cautious here.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> On 04/05/2022 10:54, Piotr Nowojski wrote:
>>> > Hi,
>>> >
>>> > Thanks for the answers.
>>> >
>>> >> we may still need to discuss whether the
>>> >> overdraft/reserve/spare should use extra buffers or buffers
>>> >> in (exclusive + floating buffers)?
>>> > and
>>> >
>>> >> These things resolve the different problems (at least as I see that).
>>> >> The current hardcoded "1"  says that we switch "availability" to
>>> >> "unavailability" when one more buffer is left(actually a little less
>>> >> than one buffer since we write the last piece of data to this last
>>> >> buffer). The overdraft feature doesn't change this logic we still want
>>> >> to switch to "unavailability" in such a way but if we are already in
>>> >> "unavailability" and we want more buffers then we can take "overdraft
>>> >> number" more. So we can not avoid this hardcoded "1" since we need to
>>> >> understand when we should switch to "unavailability"
>>> > Ok, I see. So it seems to me that both of you have in mind to keep the
>>> > buffer pools as they are right now, but if we are in the middle of
>>> > processing a record, we can request extra overdraft buffers on top of
>>> > those? This is another way to implement the overdraft to what I was
>>> > thinking. I was thinking about something like keeping the "overdraft"
>>> or
>>> > more precisely buffer "reserve" in the buffer pool. I think my version
>>> > would be easier to implement, because it is just fiddling with min/max
>>> > buffers calculation and slightly modified `checkAvailability()` logic.
>>> >
>>> > On the other hand  what you have in mind would better utilise the
>>> available
>>> > memory, right? It would require more code changes (how would we know
>>> when
>>> > we are allowed to request the overdraft?). However, in this case, I
>>> would
>>> > be tempted to set the number of overdraft buffers by default to
>>> > `Integer.MAX_VALUE`, and let the system request as many buffers as
>>> > necessary. The only downside that I can think of (apart of higher
>>> > complexity) would be higher chance of hitting a known/unsolved
>>> deadlock [1]
>>> > in a scenario:
>>> > - downstream task hasn't yet started
>>> > - upstream task requests overdraft and uses all available memory
>>> segments
>>> > from the global pool
>>> > - upstream task is blocked, because downstream task hasn't started yet
>>> and
>>> > can not consume any data
>>> > - downstream task tries to start, but can not, as there are no
>>> available
>>> > buffers
>>> >
>>> >> BTW, for watermark, the number of buffers it needs is
>>> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
>>> >> the watermark won't block in requestMemory.
>>> > and
>>> >
>>> >> the best overdraft size will be equal to parallelism.
>>> > That's a lot of buffers. I don't think we need that many for
>>> broadcasting
>>> > watermarks. Watermarks are small, and remember that every subpartition
>>> has
>>> > some partially filled/empty WIP buffer, so the vast majority of
>>> > subpartitions will not need to request a new buffer.
>>> >
>>> > Best,
>>> > Piotrek
>>> >
>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203
>>> >
>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com>
>>> napisał(a):
>>> >
>>> >> Hi,
>>> >>
>>> >>
>>> >>   >> Do you mean to ignore it while processing records, but keep using
>>> >> `maxBuffersPerChannel` when calculating the availability of the
>>> output?
>>> >>
>>> >>
>>> >> Yes, it is correct.
>>> >>
>>> >>
>>> >>   >> Would it be a big issue if we changed it to check if at least
>>> >> "overdraft number of buffers are available", where "overdraft number"
>>> is
>>> >> configurable, instead of the currently hardcoded value of "1"?
>>> >>
>>> >>
>>> >> These things resolve the different problems (at least as I see that).
>>> >> The current hardcoded "1"  says that we switch "availability" to
>>> >> "unavailability" when one more buffer is left(actually a little less
>>> >> than one buffer since we write the last piece of data to this last
>>> >> buffer). The overdraft feature doesn't change this logic we still want
>>> >> to switch to "unavailability" in such a way but if we are already in
>>> >> "unavailability" and we want more buffers then we can take "overdraft
>>> >> number" more. So we can not avoid this hardcoded "1" since we need to
>>> >> understand when we should switch to "unavailability"
>>> >>
>>> >>
>>> >> -- About "reserve" vs "overdraft"
>>> >>
>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will be
>>> >> equal to parallelism. Also, the user can set any value he wants. So
>>> even
>>> >> if parallelism is small(~5) but the user's flatmap produces a lot of
>>> >> data, the user can set 10 or even more. Which almost double the max
>>> >> buffers and it will be impossible to reserve. At least we need to
>>> figure
>>> >> out how to protect from such cases (the limit for an overdraft?). So
>>> >> actually it looks even more difficult than increasing the maximum
>>> buffers.
>>> >>
>>> >> I want to emphasize that overdraft buffers are soft configuration
>>> which
>>> >> means it takes as many buffers as the global buffers pool has
>>> >> available(maybe zero) but less than this configured value. It is also
>>> >> important to notice that perhaps, not many subtasks in TaskManager
>>> will
>>> >> be using this feature so we don't actually need a lot of available
>>> >> buffers for every subtask(Here, I mean that if we have only one
>>> >> window/flatmap operator and many other operators, then one TaskManager
>>> >> will have many ordinary subtasks which don't actually need overdraft
>>> and
>>> >> several subtasks that needs this feature). But in case of reservation,
>>> >> we will reserve some buffers for all operators even if they don't
>>> really
>>> >> need it.
>>> >>
>>> >>
>>> >> -- Legacy source problem
>>> >>
>>> >> If we still want to change max buffers then it is problem for
>>> >> LegacySources(since every subtask of source will always use these
>>> >> overdraft). But right now, I think that we can force to set 0
>>> overdraft
>>> >> buffers for legacy subtasks in configuration during execution(if it is
>>> >> not too late for changing configuration in this place).
>>> >>
>>> >>
>>> >> 03.05.2022 14:11, rui fan пишет:
>>> >>> Hi
>>> >>>
>>> >>> Thanks for Martijn Visser and Piotrek's feedback.  I agree with
>>> >>> ignoring the legacy source, it will affect our design. User should
>>> >>> use the new Source Api as much as possible.
>>> >>>
>>> >>> Hi Piotrek, we may still need to discuss whether the
>>> >>> overdraft/reserve/spare should use extra buffers or buffers
>>> >>> in (exclusive + floating buffers)? They have some differences.
>>> >>>
>>> >>> If it uses extra buffers:
>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1
>>> >>>    <= currentPoolSize) and all subpartitions don't reach the
>>> >>> maxBuffersPerChannel.
>>> >>>
>>> >>> If it uses the buffers in (exclusive + floating buffers):
>>> >>> 1. The LocalBufferPool will be available when (usedBuffers +
>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions
>>> >>> don't reach the maxBuffersPerChannel.
>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), the
>>> >>> usedBuffers will be small. That is the LocalBufferPool will be
>>> >>> easily unavailable. For throughput, if users turn up the
>>> >>> overdraft buffers, they need to turn up exclusive or floating
>>> >>> buffers. It also affects the InputChannel, and it's is unfriendly
>>> >>> to users.
>>> >>>
>>> >>> So I prefer the overdraft to use extra buffers.
>>> >>>
>>> >>>
>>> >>> BTW, for watermark, the number of buffers it needs is
>>> >>> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
>>> >>> the watermark won't block in requestMemory. But it has
>>> >>> 2 problems:
>>> >>> 1. It needs more overdraft buffers. If the overdraft uses
>>> >>> (exclusive + floating buffers),  there will be fewer buffers
>>> >>> available. Throughput may be affected.
>>> >>> 2. The numberOfSubpartitions is different for each Task.
>>> >>> So if users want to cover watermark using this feature,
>>> >>> they don't know how to set the overdraftBuffers more r
>>> >>> easonably. And if the parallelism is changed, users still
>>> >>> need to change overdraftBuffers. It is unfriendly to users.
>>> >>>
>>> >>> So I propose we support overdraftBuffers=-1, It means
>>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions
>>> >>> in the Constructor of LocalBufferPool.
>>> >>>
>>> >>> Please correct me if I'm wrong.
>>> >>>
>>> >>> Thanks
>>> >>> fanrui
>>> >>>
>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <pnowoj...@apache.org>
>>> >> wrote:
>>> >>>> Hi fanrui,
>>> >>>>
>>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive
>>> >> buffers *
>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be
>>> available
>>> >>>> when
>>> >>>>> (usedBuffers+overdraftBuffers <=
>>> >>>> exclusiveBuffers*parallelism+floatingBuffers)
>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, right?
>>> >>>> I'm not sure. Definitely we would need to adjust the minimum number
>>> of
>>> >> the
>>> >>>> required buffers, just as we did when we were implementing the non
>>> >> blocking
>>> >>>> outputs and adding availability logic to LocalBufferPool. Back then
>>> we
>>> >>>> added "+ 1" to the minimum number of buffers. Currently this logic
>>> is
>>> >>>> located NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
>>> >>>>
>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions
>>> + 1;
>>> >>>> For performance reasons, we always require at least one buffer per
>>> >>>> sub-partition. Otherwise performance falls drastically. Now if we
>>> >> require 5
>>> >>>> overdraft buffers for output to be available, we need to have them
>>> on
>>> >> top
>>> >>>> of those "one buffer per sub-partition". So the logic should be
>>> changed
>>> >> to:
>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions
>>> +
>>> >>>> numOverdraftBuffers;
>>> >>>>
>>> >>>> Regarding increasing the number of max buffers I'm not sure. As
>>> long as
>>> >>>> "overdraft << max number of buffers", because all buffers on the
>>> outputs
>>> >>>> are shared across all sub-partitions. If we have 5 overdraft
>>> buffers,
>>> >> and
>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of things
>>> if
>>> >> we
>>> >>>> make the output available if at least one single buffer is
>>> available or
>>> >> at
>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So effects
>>> of
>>> >>>> increasing the overdraft from 1 to for example 5 should be
>>> negligible.
>>> >> For
>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 still
>>> >> increases
>>> >>>> the overdraft by only about 25%. So maybe we can keep the max as it
>>> is?
>>> >>>>
>>> >>>> If so, maybe we should change the name from "overdraft" to "buffer
>>> >> reserve"
>>> >>>> or "spare buffers"? And document it as "number of buffers kept in
>>> >> reserve
>>> >>>> in case of flatMap/firing timers/huge records"?
>>> >>>>
>>> >>>> What do you think Fenrui, Anton?
>>> >>>>
>>> >>>> Re LegacySources. I agree we can kind of ignore them in the new
>>> >> features,
>>> >>>> as long as we don't brake the existing deployments too much.
>>> >>>>
>>> >>>> Best,
>>> >>>> Piotrek
>>> >>>>
>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com>
>>> >> napisał(a):
>>> >>>>> Hi everyone,
>>> >>>>>
>>> >>>>> Just wanted to chip in on the discussion of legacy sources: IMHO,
>>> we
>>> >>>> should
>>> >>>>> not focus too much on improving/adding capabilities for legacy
>>> sources.
>>> >>>> We
>>> >>>>> want to persuade and push users to use the new Source API. Yes,
>>> this
>>> >>>> means
>>> >>>>> that there's work required by the end users to port any custom
>>> source
>>> >> to
>>> >>>>> the new interface. The benefits of the new Source API should
>>> outweigh
>>> >>>> this.
>>> >>>>> Anything that we build to support multiple interfaces means adding
>>> more
>>> >>>>> complexity and more possibilities for bugs. Let's try to make our
>>> >> lives a
>>> >>>>> little bit easier.
>>> >>>>>
>>> >>>>> Best regards,
>>> >>>>>
>>> >>>>> Martijn Visser
>>> >>>>> https://twitter.com/MartijnVisser82
>>> >>>>> https://github.com/MartijnVisser
>>> >>>>>
>>> >>>>>
>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> wrote:
>>> >>>>>
>>> >>>>>> Hi Piotrek
>>> >>>>>>
>>> >>>>>>> Do you mean to ignore it while processing records, but keep using
>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the
>>> >>>> output?
>>> >>>>>> I think yes, and please Anton Kalashnikov to help double check.
>>> >>>>>>
>>> >>>>>>> +1 for just having this as a separate configuration. Is it a big
>>> >>>>> problem
>>> >>>>>>> that legacy sources would be ignoring it? Note that we already
>>> have
>>> >>>>>>> effectively hardcoded a single overdraft buffer.
>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single
>>> >>>> buffer
>>> >>>>>>> available and this works the same for all tasks (including legacy
>>> >>>>> source
>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at
>>> least
>>> >>>>>>> "overdraft number of buffers are available", where "overdraft
>>> number"
>>> >>>>> is
>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"?
>>> >>>>>> Do you mean don't add the extra buffers? We just use (exclusive
>>> >>>> buffers *
>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be
>>> available
>>> >>>>> when
>>> >>>>>> (usedBuffers+overdraftBuffers <=
>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers)
>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, right?
>>> >>>>>>
>>> >>>>>> If yes, I think it can solve the problem of legacy source. There
>>> may
>>> >> be
>>> >>>>>> some impact. If overdraftBuffers is large and only one buffer is
>>> used
>>> >>>> to
>>> >>>>>> process a single record, exclusive buffers*parallelism + floating
>>> >>>> buffers
>>> >>>>>> cannot be used. It may only be possible to use (exclusive buffers
>>> *
>>> >>>>>> parallelism
>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, if
>>> turn
>>> >> up
>>> >>>>> the
>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive or
>>> >>>> floating
>>> >>>>>> buffers. And it also affects the InputChannel.
>>> >>>>>>
>>> >>>>>> If not, I don't think it can solve the problem of legacy source.
>>> The
>>> >>>>> legacy
>>> >>>>>> source don't check isAvailable, If there are the extra buffers,
>>> legacy
>>> >>>>>> source
>>> >>>>>> will use them up until block in requestMemory.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> Thanks
>>> >>>>>> fanrui
>>> >>>>>>
>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <
>>> pnowoj...@apache.org>
>>> >>>>>> wrote:
>>> >>>>>>
>>> >>>>>>> Hi,
>>> >>>>>>>
>>> >>>>>>> +1 for the general proposal from my side. It would be a nice
>>> >>>> workaround
>>> >>>>>>> flatMaps, WindowOperators and large records issues with unaligned
>>> >>>>>>> checkpoints.
>>> >>>>>>>
>>> >>>>>>>> The first task is about ignoring max buffers per channel. This
>>> >>>> means
>>> >>>>> if
>>> >>>>>>>> we request a memory segment from LocalBufferPool and the
>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just ignore
>>> >>>> that
>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has it(it
>>> is
>>> >>>>>>>> actually not a overdraft).
>>> >>>>>>> Do you mean to ignore it while processing records, but keep using
>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the
>>> >>>> output?
>>> >>>>>>>> The second task is about the real overdraft. I am pretty
>>> convinced
>>> >>>>> now
>>> >>>>>>>> that we, unfortunately, need configuration for limitation of
>>> >>>>> overdraft
>>> >>>>>>>> number(because it is not ok if one subtask allocates all
>>> buffers of
>>> >>>>> one
>>> >>>>>>>> TaskManager considering that several different jobs can be
>>> >>>> submitted
>>> >>>>> on
>>> >>>>>>>> this TaskManager). So idea is to have
>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>>> >>>>>> LocalBufferPool).
>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is
>>> >>>> reached,
>>> >>>>>>>> LocalBufferPool can request additionally from NetworkBufferPool
>>> up
>>> >>>> to
>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>> >>>>>>> +1 for just having this as a separate configuration. Is it a big
>>> >>>>> problem
>>> >>>>>>> that legacy sources would be ignoring it? Note that we already
>>> have
>>> >>>>>>> effectively hardcoded a single overdraft buffer.
>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single
>>> >>>> buffer
>>> >>>>>>> available and this works the same for all tasks (including legacy
>>> >>>>> source
>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at
>>> least
>>> >>>>>>> "overdraft number of buffers are available", where "overdraft
>>> number"
>>> >>>>> is
>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"?
>>> >>>>>>>
>>> >>>>>>> Best,
>>> >>>>>>> Piotrek
>>> >>>>>>>
>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com>
>>> napisał(a):
>>> >>>>>>>
>>> >>>>>>>> Let me add some information about the LegacySource.
>>> >>>>>>>>
>>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource.
>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
>>> >>>>>>>> The default value is false. If the getAvailableFuture is called,
>>> >>>>>>>> change enableOverdraft=true. It indicates whether there are
>>> >>>>>>>> checks isAvailable elsewhere.
>>> >>>>>>>>
>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct me if
>>> >>>> I'm
>>> >>>>>>> wrong.
>>> >>>>>>>> Thanks
>>> >>>>>>>> fanrui
>>> >>>>>>>>
>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com>
>>> >>>>> wrote:
>>> >>>>>>>>> Hi,
>>> >>>>>>>>>
>>> >>>>>>>>> Thanks for your quick response.
>>> >>>>>>>>>
>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need to
>>> >>>>> discuss
>>> >>>>>>> the
>>> >>>>>>>>> default value in PR.
>>> >>>>>>>>>
>>> >>>>>>>>> For the legacy source, you are right. It's difficult for
>>> general
>>> >>>>>>>>> implementation.
>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in
>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common
>>> >>>>> LegacySource,
>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume
>>> >>>> kafka,
>>> >>>>> so
>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems.
>>> >>>>>>>>>
>>> >>>>>>>>> Core code:
>>> >>>>>>>>> ```
>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() {
>>> >>>>>>>>>        if (recordWriter == null
>>> >>>>>>>>>             ||
>>> >>>>>>>>>
>>> >>
>>> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
>>> >>>>>>>>> false)
>>> >>>>>>>>>             || recordWriter.isAvailable()) {
>>> >>>>>>>>>             return;
>>> >>>>>>>>>        }
>>> >>>>>>>>>
>>> >>>>>>>>>        CompletableFuture<?> resumeFuture =
>>> >>>>>>>> recordWriter.getAvailableFuture();
>>> >>>>>>>>>        try {
>>> >>>>>>>>>             resumeFuture.get();
>>> >>>>>>>>>        } catch (Throwable ignored) {
>>> >>>>>>>>>        }
>>> >>>>>>>>> }
>>> >>>>>>>>> ```
>>> >>>>>>>>>
>>> >>>>>>>>> LegacySource calls
>>> sourceContext.ensureRecordWriterIsAvailable()
>>> >>>>>>>>> before synchronized (checkpointLock) and collects records.
>>> >>>>>>>>> Please let me know if there is a better solution.
>>> >>>>>>>>>
>>> >>>>>>>>> Thanks
>>> >>>>>>>>> fanrui
>>> >>>>>>>>>
>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
>>> >>>>>> kaa....@yandex.com>
>>> >>>>>>>>> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>>> Hi.
>>> >>>>>>>>>>
>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or two
>>> >>>>>> commits
>>> >>>>>>>> in a
>>> >>>>>>>>>>       PR?
>>> >>>>>>>>>>
>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this task
>>> has
>>> >>>>>> fewer
>>> >>>>>>>>>> questions but we should find a solution for LegacySource
>>> first.
>>> >>>>>>>>>>
>>> >>>>>>>>>> --  2. For the first task, if the flink user disables the
>>> >>>>> Unaligned
>>> >>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
>>> Because
>>> >>>> the
>>> >>>>>>>>>> overdraft
>>> >>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still needs
>>> to
>>> >>>>> wait
>>> >>>>>>> for
>>> >>>>>>>>>>       downstream Task to consume.
>>> >>>>>>>>>>
>>> >>>>>>>>>> I think that the logic should be the same for AC and UC. As I
>>> >>>>>>>> understand,
>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it
>>> doesn't
>>> >>>>> make
>>> >>>>>>> it
>>> >>>>>>>>>> worse as well.
>>> >>>>>>>>>>
>>> >>>>>>>>>>     3. For the second task
>>> >>>>>>>>>> --      - The default value of maxOverdraftBuffersPerPartition
>>> >>>> may
>>> >>>>>>> also
>>> >>>>>>>>>> need
>>> >>>>>>>>>>          to be discussed.
>>> >>>>>>>>>>
>>> >>>>>>>>>> I think it should be a pretty small value or even 0 since it
>>> >>>> kind
>>> >>>>> of
>>> >>>>>>>>>> optimization and user should understand what they
>>> do(especially
>>> >>>> if
>>> >>>>>> we
>>> >>>>>>>>>> implement the first task).
>>> >>>>>>>>>>
>>> >>>>>>>>>> --      - If the user disables the Unaligned Checkpoint, can
>>> we
>>> >>>>> set
>>> >>>>>>> the
>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>>> overdraft
>>> >>>>>> isn't
>>> >>>>>>>>>> useful for
>>> >>>>>>>>>>          the Aligned Checkpoint.
>>> >>>>>>>>>>
>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make
>>> >>>>>> degradation
>>> >>>>>>>> for
>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make
>>> >>>>> difference
>>> >>>>>>>> between
>>> >>>>>>>>>> AC and UC.
>>> >>>>>>>>>>
>>> >>>>>>>>>>       4. For the legacy source
>>> >>>>>>>>>> --      - If enabling the Unaligned Checkpoint, it uses up to
>>> >>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>>> >>>>>>>>>>          - If disabling the UC, it doesn't use the overdraft
>>> >>>> buffer.
>>> >>>>>>>>>>          - Do you think it's ok?
>>> >>>>>>>>>>
>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at all
>>> >>>>> since
>>> >>>>>>> it
>>> >>>>>>>>>> can lead to undesirable results especially if the limit is
>>> high.
>>> >>>>> At
>>> >>>>>>>> least,
>>> >>>>>>>>>> as I understand, it will always work in overdraft mode and it
>>> >>>> will
>>> >>>>>>>> borrow
>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool
>>> >>>> which
>>> >>>>>> can
>>> >>>>>>>> lead
>>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager.
>>> >>>>>>>>>>
>>> >>>>>>>>>> --      - Actually, we added the checkAvailable logic for
>>> >>>>>> LegacySource
>>> >>>>>>>> in
>>> >>>>>>>>>> our
>>> >>>>>>>>>>          internal version. It works well.
>>> >>>>>>>>>>
>>> >>>>>>>>>> I don't really understand how it is possible for general case
>>> >>>>>>>> considering
>>> >>>>>>>>>> that each user has their own implementation of
>>> >>>>> LegacySourceOperator
>>> >>>>>>>>>> --   5. For the benchmark, do you have any suggestions? I
>>> >>>>> submitted
>>> >>>>>>> the
>>> >>>>>>>> PR
>>> >>>>>>>>>>       [1].
>>> >>>>>>>>>>
>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon.
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
>>> >>>>>>>>>>> Hi,
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>       1. Do you mean split this into two JIRAs or two PRs or
>>> two
>>> >>>>>>> commits
>>> >>>>>>>>>> in a
>>> >>>>>>>>>>>       PR?
>>> >>>>>>>>>>>       2. For the first task, if the flink user disables the
>>> >>>>>> Unaligned
>>> >>>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
>>> Because
>>> >>>>> the
>>> >>>>>>>>>> overdraft
>>> >>>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still
>>> needs to
>>> >>>>>> wait
>>> >>>>>>>> for
>>> >>>>>>>>>>>       downstream Task to consume.
>>> >>>>>>>>>>>       3. For the second task
>>> >>>>>>>>>>>          - The default value of
>>> maxOverdraftBuffersPerPartition
>>> >>>>> may
>>> >>>>>>> also
>>> >>>>>>>>>> need
>>> >>>>>>>>>>>          to be discussed.
>>> >>>>>>>>>>>          - If the user disables the Unaligned Checkpoint,
>>> can we
>>> >>>>> set
>>> >>>>>>> the
>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>>> >>>> overdraft
>>> >>>>>>> isn't
>>> >>>>>>>>>> useful for
>>> >>>>>>>>>>>          the Aligned Checkpoint.
>>> >>>>>>>>>>>       4. For the legacy source
>>> >>>>>>>>>>>          - If enabling the Unaligned Checkpoint, it uses up
>>> to
>>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>>> >>>>>>>>>>>          - If disabling the UC, it doesn't use the overdraft
>>> >>>>> buffer.
>>> >>>>>>>>>>>          - Do you think it's ok?
>>> >>>>>>>>>>>          - Actually, we added the checkAvailable logic for
>>> >>>>>>> LegacySource
>>> >>>>>>>>>> in our
>>> >>>>>>>>>>>          internal version. It works well.
>>> >>>>>>>>>>>       5. For the benchmark, do you have any suggestions? I
>>> >>>>> submitted
>>> >>>>>>> the
>>> >>>>>>>>>> PR
>>> >>>>>>>>>>>       [1].
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Thanks
>>> >>>>>>>>>>> fanrui
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
>>> >>>>>>> kaa....@yandex.com
>>> >>>>>>>>>>> wrote:
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>> Hi,
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here is
>>> >>>>> some
>>> >>>>>>>>>>>> conclusion:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> First of all, let's split this into two tasks.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> The first task is about ignoring max buffers per channel.
>>> >>>> This
>>> >>>>>>> means
>>> >>>>>>>> if
>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the
>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just
>>> >>>>> ignore
>>> >>>>>>> that
>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool has
>>> >>>> it(it
>>> >>>>>> is
>>> >>>>>>>>>>>> actually not a overdraft).
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty
>>> >>>>>> convinced
>>> >>>>>>>> now
>>> >>>>>>>>>>>> that we, unfortunately, need configuration for limitation of
>>> >>>>>>>> overdraft
>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all
>>> >>>>> buffers
>>> >>>>>> of
>>> >>>>>>>> one
>>> >>>>>>>>>>>> TaskManager considering that several different jobs can be
>>> >>>>>>> submitted
>>> >>>>>>>> on
>>> >>>>>>>>>>>> this TaskManager). So idea is to have
>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>>> >>>>>>>>>> LocalBufferPool).
>>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool is
>>> >>>>>>> reached,
>>> >>>>>>>>>>>> LocalBufferPool can request additionally from
>>> >>>> NetworkBufferPool
>>> >>>>>> up
>>> >>>>>>> to
>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource since
>>> it
>>> >>>>>>>> actually
>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in
>>> >>>> overdraft
>>> >>>>>>> mode
>>> >>>>>>>>>>>> which is not a target. So we still need to think about that.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>      29.04.2022 11:11, rui fan пишет:
>>> >>>>>>>>>>>>> Hi Anton Kalashnikov,
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum number
>>> of
>>> >>>>>>>> overdraft
>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, right?
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to don't
>>> >>>> add
>>> >>>>>> the
>>> >>>>>>>> new
>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the community.
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> Best wishes
>>> >>>>>>>>>>>>> fanrui
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
>>> >>>>> 1996fan...@gmail.com>
>>> >>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov,
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are totally
>>> >>>>>> right.
>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used as
>>> >>>>> the
>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
>>> >>>>>>> configuration.Flink
>>> >>>>>>>>>> users
>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the overdraft
>>> >>>>>> buffer
>>> >>>>>>>>>> size.
>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we
>>> >>>> should
>>> >>>>>>> limit
>>> >>>>>>>>>> the
>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each
>>> >>>>> LocalBufferPool
>>> >>>>>>>>>>>>>> can apply for.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Why do we limit it?
>>> >>>>>>>>>>>>>> Some operators don't check the `recordWriter.isAvailable`
>>> >>>>>> during
>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have mentioned
>>> >>>> it
>>> >>>>>> in
>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use up
>>> >>>> all
>>> >>>>>>>>>> remaining
>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is
>>> >>>>>> severe.
>>> >>>>>>>>>>>>>> How to limit it?
>>> >>>>>>>>>>>>>> I prefer to hard code the
>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
>>> >>>>> maxOverdraftBuffers
>>> >>>>>> is
>>> >>>>>>>>>> just
>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink jobs.
>>> Or
>>> >>>>> we
>>> >>>>>>> can
>>> >>>>>>>>>> set
>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)`
>>> >>>> to
>>> >>>>>>> handle
>>> >>>>>>>>>>>>>> some jobs of low parallelism.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we can
>>> >>>>> set
>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
>>> >>>> LocalBufferPool.
>>> >>>>>>>> Because
>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Best wishes
>>> >>>>>>>>>>>>>> fanrui
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <
>>> >>>>>>>>>> kaa....@yandex.com>
>>> >>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Hi fanrui,
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it
>>> >>>> should
>>> >>>>>>> help
>>> >>>>>>>> in
>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about
>>> >>>>>> configuration:
>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand
>>> right
>>> >>>>> now
>>> >>>>>>> we
>>> >>>>>>>>>> have
>>> >>>>>>>>>>>>>>> following calculation.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
>>> >>>>> memory(calculated
>>> >>>>>>> via
>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
>>> >>>>>>>> taskmanager.memory.network.min,
>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) /
>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive
>>> >>>> buffers
>>> >>>>> *
>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in
>>> >>>>>> TaskManager
>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used at
>>> >>>>>>> current
>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to
>>> >>>>>>> maxBuffersNumber
>>> >>>>>>>>>> which
>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if
>>> >>>> requiredBuffersNumber
>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not good)
>>> >>>>> since
>>> >>>>>>> not
>>> >>>>>>>>>> all
>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if Flink
>>> >>>> can
>>> >>>>>> not
>>> >>>>>>>>>>>>>>> allocate floating buffers)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I
>>> >>>>>> understand
>>> >>>>>>>>>> Flink
>>> >>>>>>>>>>>>>>> just never use these leftovers buffers(maxBuffersNumber -
>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we can
>>> >>>>>> actualy
>>> >>>>>>>> use
>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
>>> >>>> buffersInUseNumber'
>>> >>>>>>> since
>>> >>>>>>>>>> if
>>> >>>>>>>>>>>>>>> one TaskManager contains several operators including
>>> >>>>> 'window'
>>> >>>>>>>> which
>>> >>>>>>>>>> can
>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool).
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to
>>> >>>> requesting
>>> >>>>>>>> buffers
>>> >>>>>>>>>>>>>>> during processing single record while switching to
>>> >>>>>> unavalability
>>> >>>>>>>>>>>> between
>>> >>>>>>>>>>>>>>> records should be the same as we have it now):
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel
>>> >>>>>> reached,
>>> >>>>>>>>>> then
>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers
>>> from
>>> >>>>> any
>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet
>>> >>>> otherwise
>>> >>>>>>> from
>>> >>>>>>>>>>>>>>> NetworkBufferPool)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally
>>> >>>> allocate
>>> >>>>>> it
>>> >>>>>>>> from
>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't work,
>>> >>>> but I
>>> >>>>>>> like
>>> >>>>>>>> it
>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch without
>>> >>>> any
>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be configuration
>>> >>>> by
>>> >>>>>>>>>> changing
>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and requiredBuffersNumber.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really want to
>>> >>>>>>>> implement
>>> >>>>>>>>>> new
>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to
>>> >>>>> correctly
>>> >>>>>>>>>> configure
>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I don't
>>> >>>> want
>>> >>>>>> to
>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to
>>> >>>> resolve
>>> >>>>>> the
>>> >>>>>>>>>> problem
>>> >>>>>>>>>>>>>>> automatically(as described above).
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers
>>> >>>> correct?
>>> >>>>>>>>>>>>>>> --
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Best regards,
>>> >>>>>>>>>>>>>>> Anton Kalashnikov
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
>>> >>>>>>>>>>>>>>>> Hi everyone,
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature of
>>> >>>>>> Flink.
>>> >>>>>>>> It
>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout or
>>> >>>>> slow
>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work
>>> well
>>> >>>>>> when
>>> >>>>>>>> the
>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers are
>>> >>>>>>> required
>>> >>>>>>>> to
>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also mentioned
>>> >>>>> this
>>> >>>>>>>> issue
>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve it.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the
>>> >>>>>>> overdraft
>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
>>> >>>> Kalashnikov,
>>> >>>>>>> there
>>> >>>>>>>>>> are
>>> >>>>>>>>>>>>>>>> still some points to discuss:
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>       * There are already a lot of buffer-related
>>> >>>>>> configurations.
>>> >>>>>>>> Do
>>> >>>>>>>>>> we
>>> >>>>>>>>>>>>>>>>         need to add a new configuration for the
>>> overdraft
>>> >>>>>> buffer?
>>> >>>>>>>>>>>>>>>>       * Where should the overdraft buffer use memory?
>>> >>>>>>>>>>>>>>>>       * If the overdraft-buffer uses the memory
>>> remaining
>>> >>>> in
>>> >>>>>> the
>>> >>>>>>>>>>>>>>>>         NetworkBufferPool, no new configuration needs
>>> to be
>>> >>>>>>> added.
>>> >>>>>>>>>>>>>>>>       * If adding a new configuration:
>>> >>>>>>>>>>>>>>>>           o Should we set the overdraft-memory-size at
>>> the
>>> >>>> TM
>>> >>>>>>> level
>>> >>>>>>>>>> or
>>> >>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>             Task level?
>>> >>>>>>>>>>>>>>>>           o Or set overdraft-buffers to indicate the
>>> number
>>> >>>>> of
>>> >>>>>>>>>>>>>>>>             memory-segments that can be overdrawn.
>>> >>>>>>>>>>>>>>>>           o What is the default value? How to set
>>> sensible
>>> >>>>>>>> defaults?
>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it using
>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers at
>>> >>>>> Task
>>> >>>>>>>> level,
>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool
>>> >>>> can
>>> >>>>>>>>>> overdraw up
>>> >>>>>>>>>>>>>>>> to 10 memory-segments.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Looking forward to your feedback!
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Thanks,
>>> >>>>>>>>>>>>>>>> fanrui
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> [1]
>>> >>>>>>>>>>>>>>>>
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>>> >>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396
>>> >>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762
>>> >>>>>>>>>>>>>>>> [4]
>>> >>>>>>>>>>>>>>>>
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>>> >>>>>>>>>>>>>>>> [5]
>>> >>>>>>>>>>>>>>>>
>>> >>
>>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
>>> >>>>>>>>>>>>>>>> [6] https://github.com/apache/flink-benchmarks/pull/54
>>> >>>>>>>>>>>> --
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Best regards,
>>> >>>>>>>>>>>> Anton Kalashnikov
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>> --
>>> >>>>>>>>>>
>>> >>>>>>>>>> Best regards,
>>> >>>>>>>>>> Anton Kalashnikov
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >> --
>>> >>
>>> >> Best regards,
>>> >> Anton Kalashnikov
>>> >>
>>> >>
>>>
>>

Reply via email to