Hi again,

After sleeping over this, if both versions (reserve and overdraft) have the
same complexity, I would also prefer the overdraft.

> `Integer.MAX_VALUE` as default value was my idea as well but now, as
> Dawid mentioned, I think it is dangerous since it is too implicit for
> the user and if the user submits one more job for the same TaskManger

As I mentioned, it's not only an issue with multiple jobs. The same problem
can happen with different subtasks from the same job, potentially leading
to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I would be in
favour of Integer.MAX_VALUE to be the default value, but as it is, I think
we should indeed play on the safe side and limit it.

> I still don't understand how should be limited "reserve" implementation.
> I mean if we have X buffers in total and the user sets overdraft equal
> to X we obviously can not reserve all buffers, but how many we are
> allowed to reserve? Should it be a different configuration like
> percentegeForReservedBuffers?

The reserve could be defined as percentage, or as a fixed number of
buffers. But yes. In normal operation subtask would not use the reserve, as
if numberOfAvailableBuffers < reserve, the output would be not available.
Only in the flatMap/timers/huge records case the reserve could be used.

> 1. If the total buffers of LocalBufferPool <= the reserve buffers, will
LocalBufferPool never be available? Can't process data?

Of course we would need to make sure that never happens. So the reserve
should be < total buffer size.

> 2. If the overdraft buffer use the extra buffers, when the downstream
> task inputBuffer is insufficient, it should fail to start the job, and
then
> restart? When the InputBuffer is initialized, it will apply for enough
> buffers, right?

The failover if downstream can not allocate buffers is already implemented
FLINK-14872 [2]. There is a timeout for how long the task is waiting for
buffer allocation. However this doesn't prevent many (potentially
infinitely many) deadlock/restarts cycles. IMO the propper solution for [1]
would be 2b described in the ticket:

> 2b. Assign extra buffers only once all of the tasks are RUNNING. This is
a simplified version of 2a, without tracking the tasks sink-to-source.

But that's a pre-existing problem and I don't think we have to solve it
before implementing overdraft. I think we would need to solve it only
before setting Integer.MAX_VALUE as the default for the overdraft. Maybe I
would hesitate setting the overdraft to anything more then a couple of
buffers by default for the same reason.

> Actually, I totally agree that we don't need a lot of buffers for
overdraft

and

> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
> When we finish this feature and after users use it, if users feedback
> this issue we can discuss again.

+1

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-13203
[2] https://issues.apache.org/jira/browse/FLINK-14872

czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):

> Hi everyone,
>
> I still have some questions.
>
> 1. If the total buffers of LocalBufferPool <= the reserve buffers, will
> LocalBufferPool never be available? Can't process data?
> 2. If the overdraft buffer use the extra buffers, when the downstream
> task inputBuffer is insufficient, it should fail to start the job, and
> then
> restart? When the InputBuffer is initialized, it will apply for enough
> buffers, right?
>
> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
> When we finish this feature and after users use it, if users feedback
> this issue we can discuss again.
>
> Thanks
> fanrui
>
> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hey all,
>>
>> I have not replied in the thread yet, but I was following the discussion.
>>
>> Personally, I like Fanrui's and Anton's idea. As far as I understand it
>> the idea to distinguish between inside flatMap & outside would be fairly
>> simple, but maybe slightly indirect. The checkAvailability would remain
>> unchanged and it is checked always between separate invocations of the
>> UDF. Therefore the overdraft buffers would not apply there. However once
>> the pool says it is available, it means it has at least an initial
>> buffer. So any additional request without checking for availability can
>> be considered to be inside of processing a single record. This does not
>> hold just for the LegacySource as I don't think it actually checks for
>> the availability of buffers in the LocalBufferPool.
>>
>> In the offline chat with Anton, we also discussed if we need a limit of
>> the number of buffers we could overdraft (or in other words if the limit
>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to stay
>> on the safe side and have it limited. The pool of network buffers is
>> shared for the entire TaskManager, so it means it can be shared even
>> across tasks of separate jobs. However, I might be just unnecessarily
>> cautious here.
>>
>> Best,
>>
>> Dawid
>>
>> On 04/05/2022 10:54, Piotr Nowojski wrote:
>> > Hi,
>> >
>> > Thanks for the answers.
>> >
>> >> we may still need to discuss whether the
>> >> overdraft/reserve/spare should use extra buffers or buffers
>> >> in (exclusive + floating buffers)?
>> > and
>> >
>> >> These things resolve the different problems (at least as I see that).
>> >> The current hardcoded "1"  says that we switch "availability" to
>> >> "unavailability" when one more buffer is left(actually a little less
>> >> than one buffer since we write the last piece of data to this last
>> >> buffer). The overdraft feature doesn't change this logic we still want
>> >> to switch to "unavailability" in such a way but if we are already in
>> >> "unavailability" and we want more buffers then we can take "overdraft
>> >> number" more. So we can not avoid this hardcoded "1" since we need to
>> >> understand when we should switch to "unavailability"
>> > Ok, I see. So it seems to me that both of you have in mind to keep the
>> > buffer pools as they are right now, but if we are in the middle of
>> > processing a record, we can request extra overdraft buffers on top of
>> > those? This is another way to implement the overdraft to what I was
>> > thinking. I was thinking about something like keeping the "overdraft" or
>> > more precisely buffer "reserve" in the buffer pool. I think my version
>> > would be easier to implement, because it is just fiddling with min/max
>> > buffers calculation and slightly modified `checkAvailability()` logic.
>> >
>> > On the other hand  what you have in mind would better utilise the
>> available
>> > memory, right? It would require more code changes (how would we know
>> when
>> > we are allowed to request the overdraft?). However, in this case, I
>> would
>> > be tempted to set the number of overdraft buffers by default to
>> > `Integer.MAX_VALUE`, and let the system request as many buffers as
>> > necessary. The only downside that I can think of (apart of higher
>> > complexity) would be higher chance of hitting a known/unsolved deadlock
>> [1]
>> > in a scenario:
>> > - downstream task hasn't yet started
>> > - upstream task requests overdraft and uses all available memory
>> segments
>> > from the global pool
>> > - upstream task is blocked, because downstream task hasn't started yet
>> and
>> > can not consume any data
>> > - downstream task tries to start, but can not, as there are no available
>> > buffers
>> >
>> >> BTW, for watermark, the number of buffers it needs is
>> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
>> >> the watermark won't block in requestMemory.
>> > and
>> >
>> >> the best overdraft size will be equal to parallelism.
>> > That's a lot of buffers. I don't think we need that many for
>> broadcasting
>> > watermarks. Watermarks are small, and remember that every subpartition
>> has
>> > some partially filled/empty WIP buffer, so the vast majority of
>> > subpartitions will not need to request a new buffer.
>> >
>> > Best,
>> > Piotrek
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-13203
>> >
>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com>
>> napisał(a):
>> >
>> >> Hi,
>> >>
>> >>
>> >>   >> Do you mean to ignore it while processing records, but keep using
>> >> `maxBuffersPerChannel` when calculating the availability of the output?
>> >>
>> >>
>> >> Yes, it is correct.
>> >>
>> >>
>> >>   >> Would it be a big issue if we changed it to check if at least
>> >> "overdraft number of buffers are available", where "overdraft number"
>> is
>> >> configurable, instead of the currently hardcoded value of "1"?
>> >>
>> >>
>> >> These things resolve the different problems (at least as I see that).
>> >> The current hardcoded "1"  says that we switch "availability" to
>> >> "unavailability" when one more buffer is left(actually a little less
>> >> than one buffer since we write the last piece of data to this last
>> >> buffer). The overdraft feature doesn't change this logic we still want
>> >> to switch to "unavailability" in such a way but if we are already in
>> >> "unavailability" and we want more buffers then we can take "overdraft
>> >> number" more. So we can not avoid this hardcoded "1" since we need to
>> >> understand when we should switch to "unavailability"
>> >>
>> >>
>> >> -- About "reserve" vs "overdraft"
>> >>
>> >> As Fanrui mentioned above, perhaps, the best overdraft size will be
>> >> equal to parallelism. Also, the user can set any value he wants. So
>> even
>> >> if parallelism is small(~5) but the user's flatmap produces a lot of
>> >> data, the user can set 10 or even more. Which almost double the max
>> >> buffers and it will be impossible to reserve. At least we need to
>> figure
>> >> out how to protect from such cases (the limit for an overdraft?). So
>> >> actually it looks even more difficult than increasing the maximum
>> buffers.
>> >>
>> >> I want to emphasize that overdraft buffers are soft configuration which
>> >> means it takes as many buffers as the global buffers pool has
>> >> available(maybe zero) but less than this configured value. It is also
>> >> important to notice that perhaps, not many subtasks in TaskManager will
>> >> be using this feature so we don't actually need a lot of available
>> >> buffers for every subtask(Here, I mean that if we have only one
>> >> window/flatmap operator and many other operators, then one TaskManager
>> >> will have many ordinary subtasks which don't actually need overdraft
>> and
>> >> several subtasks that needs this feature). But in case of reservation,
>> >> we will reserve some buffers for all operators even if they don't
>> really
>> >> need it.
>> >>
>> >>
>> >> -- Legacy source problem
>> >>
>> >> If we still want to change max buffers then it is problem for
>> >> LegacySources(since every subtask of source will always use these
>> >> overdraft). But right now, I think that we can force to set 0 overdraft
>> >> buffers for legacy subtasks in configuration during execution(if it is
>> >> not too late for changing configuration in this place).
>> >>
>> >>
>> >> 03.05.2022 14:11, rui fan пишет:
>> >>> Hi
>> >>>
>> >>> Thanks for Martijn Visser and Piotrek's feedback.  I agree with
>> >>> ignoring the legacy source, it will affect our design. User should
>> >>> use the new Source Api as much as possible.
>> >>>
>> >>> Hi Piotrek, we may still need to discuss whether the
>> >>> overdraft/reserve/spare should use extra buffers or buffers
>> >>> in (exclusive + floating buffers)? They have some differences.
>> >>>
>> >>> If it uses extra buffers:
>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1
>> >>>    <= currentPoolSize) and all subpartitions don't reach the
>> >>> maxBuffersPerChannel.
>> >>>
>> >>> If it uses the buffers in (exclusive + floating buffers):
>> >>> 1. The LocalBufferPool will be available when (usedBuffers +
>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions
>> >>> don't reach the maxBuffersPerChannel.
>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), the
>> >>> usedBuffers will be small. That is the LocalBufferPool will be
>> >>> easily unavailable. For throughput, if users turn up the
>> >>> overdraft buffers, they need to turn up exclusive or floating
>> >>> buffers. It also affects the InputChannel, and it's is unfriendly
>> >>> to users.
>> >>>
>> >>> So I prefer the overdraft to use extra buffers.
>> >>>
>> >>>
>> >>> BTW, for watermark, the number of buffers it needs is
>> >>> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
>> >>> the watermark won't block in requestMemory. But it has
>> >>> 2 problems:
>> >>> 1. It needs more overdraft buffers. If the overdraft uses
>> >>> (exclusive + floating buffers),  there will be fewer buffers
>> >>> available. Throughput may be affected.
>> >>> 2. The numberOfSubpartitions is different for each Task.
>> >>> So if users want to cover watermark using this feature,
>> >>> they don't know how to set the overdraftBuffers more r
>> >>> easonably. And if the parallelism is changed, users still
>> >>> need to change overdraftBuffers. It is unfriendly to users.
>> >>>
>> >>> So I propose we support overdraftBuffers=-1, It means
>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions
>> >>> in the Constructor of LocalBufferPool.
>> >>>
>> >>> Please correct me if I'm wrong.
>> >>>
>> >>> Thanks
>> >>> fanrui
>> >>>
>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <pnowoj...@apache.org>
>> >> wrote:
>> >>>> Hi fanrui,
>> >>>>
>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive
>> >> buffers *
>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be
>> available
>> >>>> when
>> >>>>> (usedBuffers+overdraftBuffers <=
>> >>>> exclusiveBuffers*parallelism+floatingBuffers)
>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, right?
>> >>>> I'm not sure. Definitely we would need to adjust the minimum number
>> of
>> >> the
>> >>>> required buffers, just as we did when we were implementing the non
>> >> blocking
>> >>>> outputs and adding availability logic to LocalBufferPool. Back then
>> we
>> >>>> added "+ 1" to the minimum number of buffers. Currently this logic is
>> >>>> located NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
>> >>>>
>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions
>> + 1;
>> >>>> For performance reasons, we always require at least one buffer per
>> >>>> sub-partition. Otherwise performance falls drastically. Now if we
>> >> require 5
>> >>>> overdraft buffers for output to be available, we need to have them on
>> >> top
>> >>>> of those "one buffer per sub-partition". So the logic should be
>> changed
>> >> to:
>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions +
>> >>>> numOverdraftBuffers;
>> >>>>
>> >>>> Regarding increasing the number of max buffers I'm not sure. As long
>> as
>> >>>> "overdraft << max number of buffers", because all buffers on the
>> outputs
>> >>>> are shared across all sub-partitions. If we have 5 overdraft buffers,
>> >> and
>> >>>> parallelism of 100, it doesn't matter in the grand scheme of things
>> if
>> >> we
>> >>>> make the output available if at least one single buffer is available
>> or
>> >> at
>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So effects
>> of
>> >>>> increasing the overdraft from 1 to for example 5 should be
>> negligible.
>> >> For
>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 still
>> >> increases
>> >>>> the overdraft by only about 25%. So maybe we can keep the max as it
>> is?
>> >>>>
>> >>>> If so, maybe we should change the name from "overdraft" to "buffer
>> >> reserve"
>> >>>> or "spare buffers"? And document it as "number of buffers kept in
>> >> reserve
>> >>>> in case of flatMap/firing timers/huge records"?
>> >>>>
>> >>>> What do you think Fenrui, Anton?
>> >>>>
>> >>>> Re LegacySources. I agree we can kind of ignore them in the new
>> >> features,
>> >>>> as long as we don't brake the existing deployments too much.
>> >>>>
>> >>>> Best,
>> >>>> Piotrek
>> >>>>
>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com>
>> >> napisał(a):
>> >>>>> Hi everyone,
>> >>>>>
>> >>>>> Just wanted to chip in on the discussion of legacy sources: IMHO, we
>> >>>> should
>> >>>>> not focus too much on improving/adding capabilities for legacy
>> sources.
>> >>>> We
>> >>>>> want to persuade and push users to use the new Source API. Yes, this
>> >>>> means
>> >>>>> that there's work required by the end users to port any custom
>> source
>> >> to
>> >>>>> the new interface. The benefits of the new Source API should
>> outweigh
>> >>>> this.
>> >>>>> Anything that we build to support multiple interfaces means adding
>> more
>> >>>>> complexity and more possibilities for bugs. Let's try to make our
>> >> lives a
>> >>>>> little bit easier.
>> >>>>>
>> >>>>> Best regards,
>> >>>>>
>> >>>>> Martijn Visser
>> >>>>> https://twitter.com/MartijnVisser82
>> >>>>> https://github.com/MartijnVisser
>> >>>>>
>> >>>>>
>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> wrote:
>> >>>>>
>> >>>>>> Hi Piotrek
>> >>>>>>
>> >>>>>>> Do you mean to ignore it while processing records, but keep using
>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the
>> >>>> output?
>> >>>>>> I think yes, and please Anton Kalashnikov to help double check.
>> >>>>>>
>> >>>>>>> +1 for just having this as a separate configuration. Is it a big
>> >>>>> problem
>> >>>>>>> that legacy sources would be ignoring it? Note that we already
>> have
>> >>>>>>> effectively hardcoded a single overdraft buffer.
>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single
>> >>>> buffer
>> >>>>>>> available and this works the same for all tasks (including legacy
>> >>>>> source
>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at
>> least
>> >>>>>>> "overdraft number of buffers are available", where "overdraft
>> number"
>> >>>>> is
>> >>>>>>> configurable, instead of the currently hardcoded value of "1"?
>> >>>>>> Do you mean don't add the extra buffers? We just use (exclusive
>> >>>> buffers *
>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be
>> available
>> >>>>> when
>> >>>>>> (usedBuffers+overdraftBuffers <=
>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers)
>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, right?
>> >>>>>>
>> >>>>>> If yes, I think it can solve the problem of legacy source. There
>> may
>> >> be
>> >>>>>> some impact. If overdraftBuffers is large and only one buffer is
>> used
>> >>>> to
>> >>>>>> process a single record, exclusive buffers*parallelism + floating
>> >>>> buffers
>> >>>>>> cannot be used. It may only be possible to use (exclusive buffers *
>> >>>>>> parallelism
>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, if
>> turn
>> >> up
>> >>>>> the
>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive or
>> >>>> floating
>> >>>>>> buffers. And it also affects the InputChannel.
>> >>>>>>
>> >>>>>> If not, I don't think it can solve the problem of legacy source.
>> The
>> >>>>> legacy
>> >>>>>> source don't check isAvailable, If there are the extra buffers,
>> legacy
>> >>>>>> source
>> >>>>>> will use them up until block in requestMemory.
>> >>>>>>
>> >>>>>>
>> >>>>>> Thanks
>> >>>>>> fanrui
>> >>>>>>
>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <
>> pnowoj...@apache.org>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi,
>> >>>>>>>
>> >>>>>>> +1 for the general proposal from my side. It would be a nice
>> >>>> workaround
>> >>>>>>> flatMaps, WindowOperators and large records issues with unaligned
>> >>>>>>> checkpoints.
>> >>>>>>>
>> >>>>>>>> The first task is about ignoring max buffers per channel. This
>> >>>> means
>> >>>>> if
>> >>>>>>>> we request a memory segment from LocalBufferPool and the
>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just ignore
>> >>>> that
>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has it(it
>> is
>> >>>>>>>> actually not a overdraft).
>> >>>>>>> Do you mean to ignore it while processing records, but keep using
>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the
>> >>>> output?
>> >>>>>>>> The second task is about the real overdraft. I am pretty
>> convinced
>> >>>>> now
>> >>>>>>>> that we, unfortunately, need configuration for limitation of
>> >>>>> overdraft
>> >>>>>>>> number(because it is not ok if one subtask allocates all buffers
>> of
>> >>>>> one
>> >>>>>>>> TaskManager considering that several different jobs can be
>> >>>> submitted
>> >>>>> on
>> >>>>>>>> this TaskManager). So idea is to have
>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>> >>>>>> LocalBufferPool).
>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is
>> >>>> reached,
>> >>>>>>>> LocalBufferPool can request additionally from NetworkBufferPool
>> up
>> >>>> to
>> >>>>>>>> maxOverdraftBuffersPerPartition buffers.
>> >>>>>>> +1 for just having this as a separate configuration. Is it a big
>> >>>>> problem
>> >>>>>>> that legacy sources would be ignoring it? Note that we already
>> have
>> >>>>>>> effectively hardcoded a single overdraft buffer.
>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single
>> >>>> buffer
>> >>>>>>> available and this works the same for all tasks (including legacy
>> >>>>> source
>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at
>> least
>> >>>>>>> "overdraft number of buffers are available", where "overdraft
>> number"
>> >>>>> is
>> >>>>>>> configurable, instead of the currently hardcoded value of "1"?
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Piotrek
>> >>>>>>>
>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com>
>> napisał(a):
>> >>>>>>>
>> >>>>>>>> Let me add some information about the LegacySource.
>> >>>>>>>>
>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource.
>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
>> >>>>>>>> The default value is false. If the getAvailableFuture is called,
>> >>>>>>>> change enableOverdraft=true. It indicates whether there are
>> >>>>>>>> checks isAvailable elsewhere.
>> >>>>>>>>
>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct me if
>> >>>> I'm
>> >>>>>>> wrong.
>> >>>>>>>> Thanks
>> >>>>>>>> fanrui
>> >>>>>>>>
>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com>
>> >>>>> wrote:
>> >>>>>>>>> Hi,
>> >>>>>>>>>
>> >>>>>>>>> Thanks for your quick response.
>> >>>>>>>>>
>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need to
>> >>>>> discuss
>> >>>>>>> the
>> >>>>>>>>> default value in PR.
>> >>>>>>>>>
>> >>>>>>>>> For the legacy source, you are right. It's difficult for general
>> >>>>>>>>> implementation.
>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in
>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common
>> >>>>> LegacySource,
>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume
>> >>>> kafka,
>> >>>>> so
>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems.
>> >>>>>>>>>
>> >>>>>>>>> Core code:
>> >>>>>>>>> ```
>> >>>>>>>>> public void ensureRecordWriterIsAvailable() {
>> >>>>>>>>>        if (recordWriter == null
>> >>>>>>>>>             ||
>> >>>>>>>>>
>> >>
>> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
>> >>>>>>>>> false)
>> >>>>>>>>>             || recordWriter.isAvailable()) {
>> >>>>>>>>>             return;
>> >>>>>>>>>        }
>> >>>>>>>>>
>> >>>>>>>>>        CompletableFuture<?> resumeFuture =
>> >>>>>>>> recordWriter.getAvailableFuture();
>> >>>>>>>>>        try {
>> >>>>>>>>>             resumeFuture.get();
>> >>>>>>>>>        } catch (Throwable ignored) {
>> >>>>>>>>>        }
>> >>>>>>>>> }
>> >>>>>>>>> ```
>> >>>>>>>>>
>> >>>>>>>>> LegacySource calls sourceContext.ensureRecordWriterIsAvailable()
>> >>>>>>>>> before synchronized (checkpointLock) and collects records.
>> >>>>>>>>> Please let me know if there is a better solution.
>> >>>>>>>>>
>> >>>>>>>>> Thanks
>> >>>>>>>>> fanrui
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
>> >>>>>> kaa....@yandex.com>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi.
>> >>>>>>>>>>
>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or two
>> >>>>>> commits
>> >>>>>>>> in a
>> >>>>>>>>>>       PR?
>> >>>>>>>>>>
>> >>>>>>>>>> Perhaps, the separated ticket will be better since this task
>> has
>> >>>>>> fewer
>> >>>>>>>>>> questions but we should find a solution for LegacySource first.
>> >>>>>>>>>>
>> >>>>>>>>>> --  2. For the first task, if the flink user disables the
>> >>>>> Unaligned
>> >>>>>>>>>>       Checkpoint, do we ignore max buffers per channel? Because
>> >>>> the
>> >>>>>>>>>> overdraft
>> >>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still needs
>> to
>> >>>>> wait
>> >>>>>>> for
>> >>>>>>>>>>       downstream Task to consume.
>> >>>>>>>>>>
>> >>>>>>>>>> I think that the logic should be the same for AC and UC. As I
>> >>>>>>>> understand,
>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it doesn't
>> >>>>> make
>> >>>>>>> it
>> >>>>>>>>>> worse as well.
>> >>>>>>>>>>
>> >>>>>>>>>>     3. For the second task
>> >>>>>>>>>> --      - The default value of maxOverdraftBuffersPerPartition
>> >>>> may
>> >>>>>>> also
>> >>>>>>>>>> need
>> >>>>>>>>>>          to be discussed.
>> >>>>>>>>>>
>> >>>>>>>>>> I think it should be a pretty small value or even 0 since it
>> >>>> kind
>> >>>>> of
>> >>>>>>>>>> optimization and user should understand what they do(especially
>> >>>> if
>> >>>>>> we
>> >>>>>>>>>> implement the first task).
>> >>>>>>>>>>
>> >>>>>>>>>> --      - If the user disables the Unaligned Checkpoint, can we
>> >>>>> set
>> >>>>>>> the
>> >>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>> overdraft
>> >>>>>> isn't
>> >>>>>>>>>> useful for
>> >>>>>>>>>>          the Aligned Checkpoint.
>> >>>>>>>>>>
>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make
>> >>>>>> degradation
>> >>>>>>>> for
>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make
>> >>>>> difference
>> >>>>>>>> between
>> >>>>>>>>>> AC and UC.
>> >>>>>>>>>>
>> >>>>>>>>>>       4. For the legacy source
>> >>>>>>>>>> --      - If enabling the Unaligned Checkpoint, it uses up to
>> >>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>> >>>>>>>>>>          - If disabling the UC, it doesn't use the overdraft
>> >>>> buffer.
>> >>>>>>>>>>          - Do you think it's ok?
>> >>>>>>>>>>
>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at all
>> >>>>> since
>> >>>>>>> it
>> >>>>>>>>>> can lead to undesirable results especially if the limit is
>> high.
>> >>>>> At
>> >>>>>>>> least,
>> >>>>>>>>>> as I understand, it will always work in overdraft mode and it
>> >>>> will
>> >>>>>>>> borrow
>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool
>> >>>> which
>> >>>>>> can
>> >>>>>>>> lead
>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager.
>> >>>>>>>>>>
>> >>>>>>>>>> --      - Actually, we added the checkAvailable logic for
>> >>>>>> LegacySource
>> >>>>>>>> in
>> >>>>>>>>>> our
>> >>>>>>>>>>          internal version. It works well.
>> >>>>>>>>>>
>> >>>>>>>>>> I don't really understand how it is possible for general case
>> >>>>>>>> considering
>> >>>>>>>>>> that each user has their own implementation of
>> >>>>> LegacySourceOperator
>> >>>>>>>>>> --   5. For the benchmark, do you have any suggestions? I
>> >>>>> submitted
>> >>>>>>> the
>> >>>>>>>> PR
>> >>>>>>>>>>       [1].
>> >>>>>>>>>>
>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
>> >>>>>>>>>>> Hi,
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions.
>> >>>>>>>>>>>
>> >>>>>>>>>>>       1. Do you mean split this into two JIRAs or two PRs or
>> two
>> >>>>>>> commits
>> >>>>>>>>>> in a
>> >>>>>>>>>>>       PR?
>> >>>>>>>>>>>       2. For the first task, if the flink user disables the
>> >>>>>> Unaligned
>> >>>>>>>>>>>       Checkpoint, do we ignore max buffers per channel?
>> Because
>> >>>>> the
>> >>>>>>>>>> overdraft
>> >>>>>>>>>>>       isn't useful for the Aligned Checkpoint, it still needs
>> to
>> >>>>>> wait
>> >>>>>>>> for
>> >>>>>>>>>>>       downstream Task to consume.
>> >>>>>>>>>>>       3. For the second task
>> >>>>>>>>>>>          - The default value of
>> maxOverdraftBuffersPerPartition
>> >>>>> may
>> >>>>>>> also
>> >>>>>>>>>> need
>> >>>>>>>>>>>          to be discussed.
>> >>>>>>>>>>>          - If the user disables the Unaligned Checkpoint, can
>> we
>> >>>>> set
>> >>>>>>> the
>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition=0? Because the
>> >>>> overdraft
>> >>>>>>> isn't
>> >>>>>>>>>> useful for
>> >>>>>>>>>>>          the Aligned Checkpoint.
>> >>>>>>>>>>>       4. For the legacy source
>> >>>>>>>>>>>          - If enabling the Unaligned Checkpoint, it uses up to
>> >>>>>>>>>>>          maxOverdraftBuffersPerPartition buffers.
>> >>>>>>>>>>>          - If disabling the UC, it doesn't use the overdraft
>> >>>>> buffer.
>> >>>>>>>>>>>          - Do you think it's ok?
>> >>>>>>>>>>>          - Actually, we added the checkAvailable logic for
>> >>>>>>> LegacySource
>> >>>>>>>>>> in our
>> >>>>>>>>>>>          internal version. It works well.
>> >>>>>>>>>>>       5. For the benchmark, do you have any suggestions? I
>> >>>>> submitted
>> >>>>>>> the
>> >>>>>>>>>> PR
>> >>>>>>>>>>>       [1].
>> >>>>>>>>>>>
>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks
>> >>>>>>>>>>> fanrui
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
>> >>>>>>> kaa....@yandex.com
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hi,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here is
>> >>>>> some
>> >>>>>>>>>>>> conclusion:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> First of all, let's split this into two tasks.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> The first task is about ignoring max buffers per channel.
>> >>>> This
>> >>>>>>> means
>> >>>>>>>> if
>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the
>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just
>> >>>>> ignore
>> >>>>>>> that
>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool has
>> >>>> it(it
>> >>>>>> is
>> >>>>>>>>>>>> actually not a overdraft).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty
>> >>>>>> convinced
>> >>>>>>>> now
>> >>>>>>>>>>>> that we, unfortunately, need configuration for limitation of
>> >>>>>>>> overdraft
>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all
>> >>>>> buffers
>> >>>>>> of
>> >>>>>>>> one
>> >>>>>>>>>>>> TaskManager considering that several different jobs can be
>> >>>>>>> submitted
>> >>>>>>>> on
>> >>>>>>>>>>>> this TaskManager). So idea is to have
>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per
>> >>>>>>>>>> LocalBufferPool).
>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool is
>> >>>>>>> reached,
>> >>>>>>>>>>>> LocalBufferPool can request additionally from
>> >>>> NetworkBufferPool
>> >>>>>> up
>> >>>>>>> to
>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource since it
>> >>>>>>>> actually
>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in
>> >>>> overdraft
>> >>>>>>> mode
>> >>>>>>>>>>>> which is not a target. So we still need to think about that.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>      29.04.2022 11:11, rui fan пишет:
>> >>>>>>>>>>>>> Hi Anton Kalashnikov,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I think you agree with we should limit the maximum number of
>> >>>>>>>> overdraft
>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, right?
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to don't
>> >>>> add
>> >>>>>> the
>> >>>>>>>> new
>> >>>>>>>>>>>>> configuration. And I hope to hear more from the community.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Best wishes
>> >>>>>>>>>>>>> fanrui
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
>> >>>>> 1996fan...@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>>>> Hi Anton Kalashnikov,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are totally
>> >>>>>> right.
>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used as
>> >>>>> the
>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
>> >>>>>>> configuration.Flink
>> >>>>>>>>>> users
>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the overdraft
>> >>>>>> buffer
>> >>>>>>>>>> size.
>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we
>> >>>> should
>> >>>>>>> limit
>> >>>>>>>>>> the
>> >>>>>>>>>>>>>> maximum number of overdraft segments that each
>> >>>>> LocalBufferPool
>> >>>>>>>>>>>>>> can apply for.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Why do we limit it?
>> >>>>>>>>>>>>>> Some operators don't check the `recordWriter.isAvailable`
>> >>>>>> during
>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have mentioned
>> >>>> it
>> >>>>>> in
>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use up
>> >>>> all
>> >>>>>>>>>> remaining
>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is
>> >>>>>> severe.
>> >>>>>>>>>>>>>> How to limit it?
>> >>>>>>>>>>>>>> I prefer to hard code the
>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
>> >>>>> maxOverdraftBuffers
>> >>>>>> is
>> >>>>>>>>>> just
>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink jobs. Or
>> >>>>> we
>> >>>>>>> can
>> >>>>>>>>>> set
>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)`
>> >>>> to
>> >>>>>>> handle
>> >>>>>>>>>>>>>> some jobs of low parallelism.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we can
>> >>>>> set
>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
>> >>>> LocalBufferPool.
>> >>>>>>>> Because
>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Best wishes
>> >>>>>>>>>>>>>> fanrui
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <
>> >>>>>>>>>> kaa....@yandex.com>
>> >>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Hi fanrui,
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Thanks for creating the FLIP.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it
>> >>>> should
>> >>>>>>> help
>> >>>>>>>> in
>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about
>> >>>>>> configuration:
>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand right
>> >>>>> now
>> >>>>>>> we
>> >>>>>>>>>> have
>> >>>>>>>>>>>>>>> following calculation.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
>> >>>>> memory(calculated
>> >>>>>>> via
>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
>> >>>>>>>> taskmanager.memory.network.min,
>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) /
>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive
>> >>>> buffers
>> >>>>> *
>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in
>> >>>>>> TaskManager
>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used at
>> >>>>>>> current
>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to
>> >>>>>>> maxBuffersNumber
>> >>>>>>>>>> which
>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if
>> >>>> requiredBuffersNumber
>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not good)
>> >>>>> since
>> >>>>>>> not
>> >>>>>>>>>> all
>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if Flink
>> >>>> can
>> >>>>>> not
>> >>>>>>>>>>>>>>> allocate floating buffers)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I
>> >>>>>> understand
>> >>>>>>>>>> Flink
>> >>>>>>>>>>>>>>> just never use these leftovers buffers(maxBuffersNumber -
>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we can
>> >>>>>> actualy
>> >>>>>>>> use
>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
>> >>>> buffersInUseNumber'
>> >>>>>>> since
>> >>>>>>>>>> if
>> >>>>>>>>>>>>>>> one TaskManager contains several operators including
>> >>>>> 'window'
>> >>>>>>>> which
>> >>>>>>>>>> can
>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool).
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to
>> >>>> requesting
>> >>>>>>>> buffers
>> >>>>>>>>>>>>>>> during processing single record while switching to
>> >>>>>> unavalability
>> >>>>>>>>>>>> between
>> >>>>>>>>>>>>>>> records should be the same as we have it now):
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel
>> >>>>>> reached,
>> >>>>>>>>>> then
>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers from
>> >>>>> any
>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet
>> >>>> otherwise
>> >>>>>>> from
>> >>>>>>>>>>>>>>> NetworkBufferPool)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally
>> >>>> allocate
>> >>>>>> it
>> >>>>>>>> from
>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't work,
>> >>>> but I
>> >>>>>>> like
>> >>>>>>>> it
>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch without
>> >>>> any
>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be configuration
>> >>>> by
>> >>>>>>>>>> changing
>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and requiredBuffersNumber.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really want to
>> >>>>>>>> implement
>> >>>>>>>>>> new
>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to
>> >>>>> correctly
>> >>>>>>>>>> configure
>> >>>>>>>>>>>>>>> network buffers with existing configuration and I don't
>> >>>> want
>> >>>>>> to
>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to
>> >>>> resolve
>> >>>>>> the
>> >>>>>>>>>> problem
>> >>>>>>>>>>>>>>> automatically(as described above).
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers
>> >>>> correct?
>> >>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>>>> Anton Kalashnikov
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
>> >>>>>>>>>>>>>>>> Hi everyone,
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature of
>> >>>>>> Flink.
>> >>>>>>>> It
>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout or
>> >>>>> slow
>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work well
>> >>>>>> when
>> >>>>>>>> the
>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers are
>> >>>>>>> required
>> >>>>>>>> to
>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also mentioned
>> >>>>> this
>> >>>>>>>> issue
>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve it.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the
>> >>>>>>> overdraft
>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
>> >>>> Kalashnikov,
>> >>>>>>> there
>> >>>>>>>>>> are
>> >>>>>>>>>>>>>>>> still some points to discuss:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>       * There are already a lot of buffer-related
>> >>>>>> configurations.
>> >>>>>>>> Do
>> >>>>>>>>>> we
>> >>>>>>>>>>>>>>>>         need to add a new configuration for the overdraft
>> >>>>>> buffer?
>> >>>>>>>>>>>>>>>>       * Where should the overdraft buffer use memory?
>> >>>>>>>>>>>>>>>>       * If the overdraft-buffer uses the memory remaining
>> >>>> in
>> >>>>>> the
>> >>>>>>>>>>>>>>>>         NetworkBufferPool, no new configuration needs to
>> be
>> >>>>>>> added.
>> >>>>>>>>>>>>>>>>       * If adding a new configuration:
>> >>>>>>>>>>>>>>>>           o Should we set the overdraft-memory-size at
>> the
>> >>>> TM
>> >>>>>>> level
>> >>>>>>>>>> or
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>             Task level?
>> >>>>>>>>>>>>>>>>           o Or set overdraft-buffers to indicate the
>> number
>> >>>>> of
>> >>>>>>>>>>>>>>>>             memory-segments that can be overdrawn.
>> >>>>>>>>>>>>>>>>           o What is the default value? How to set
>> sensible
>> >>>>>>>> defaults?
>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it using
>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers at
>> >>>>> Task
>> >>>>>>>> level,
>> >>>>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool
>> >>>> can
>> >>>>>>>>>> overdraw up
>> >>>>>>>>>>>>>>>> to 10 memory-segments.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Looking forward to your feedback!
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>>>>>> fanrui
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> [1]
>> >>>>>>>>>>>>>>>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>> >>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396
>> >>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762
>> >>>>>>>>>>>>>>>> [4]
>> >>>>>>>>>>>>>>>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>> >>>>>>>>>>>>>>>> [5]
>> >>>>>>>>>>>>>>>>
>> >>
>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
>> >>>>>>>>>>>>>>>> [6] https://github.com/apache/flink-benchmarks/pull/54
>> >>>>>>>>>>>> --
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>> Anton Kalashnikov
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>> --
>> >>>>>>>>>>
>> >>>>>>>>>> Best regards,
>> >>>>>>>>>> Anton Kalashnikov
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >> --
>> >>
>> >> Best regards,
>> >> Anton Kalashnikov
>> >>
>> >>
>>
>

Reply via email to