Hey all,

I have not replied in the thread yet, but I was following the discussion.

Personally, I like Fanrui's and Anton's idea. As far as I understand it the idea to distinguish between inside flatMap & outside would be fairly simple, but maybe slightly indirect. The checkAvailability would remain unchanged and it is checked always between separate invocations of the UDF. Therefore the overdraft buffers would not apply there. However once the pool says it is available, it means it has at least an initial buffer. So any additional request without checking for availability can be considered to be inside of processing a single record. This does not hold just for the LegacySource as I don't think it actually checks for the availability of buffers in the LocalBufferPool.

In the offline chat with Anton, we also discussed if we need a limit of the number of buffers we could overdraft (or in other words if the limit should be equal to Integer.MAX_VALUE), but personally I'd prefer to stay on the safe side and have it limited. The pool of network buffers is shared for the entire TaskManager, so it means it can be shared even across tasks of separate jobs. However, I might be just unnecessarily cautious here.

Best,

Dawid

On 04/05/2022 10:54, Piotr Nowojski wrote:
Hi,

Thanks for the answers.

we may still need to discuss whether the
overdraft/reserve/spare should use extra buffers or buffers
in (exclusive + floating buffers)?
and

These things resolve the different problems (at least as I see that).
The current hardcoded "1"  says that we switch "availability" to
"unavailability" when one more buffer is left(actually a little less
than one buffer since we write the last piece of data to this last
buffer). The overdraft feature doesn't change this logic we still want
to switch to "unavailability" in such a way but if we are already in
"unavailability" and we want more buffers then we can take "overdraft
number" more. So we can not avoid this hardcoded "1" since we need to
understand when we should switch to "unavailability"
Ok, I see. So it seems to me that both of you have in mind to keep the
buffer pools as they are right now, but if we are in the middle of
processing a record, we can request extra overdraft buffers on top of
those? This is another way to implement the overdraft to what I was
thinking. I was thinking about something like keeping the "overdraft" or
more precisely buffer "reserve" in the buffer pool. I think my version
would be easier to implement, because it is just fiddling with min/max
buffers calculation and slightly modified `checkAvailability()` logic.

On the other hand  what you have in mind would better utilise the available
memory, right? It would require more code changes (how would we know when
we are allowed to request the overdraft?). However, in this case, I would
be tempted to set the number of overdraft buffers by default to
`Integer.MAX_VALUE`, and let the system request as many buffers as
necessary. The only downside that I can think of (apart of higher
complexity) would be higher chance of hitting a known/unsolved deadlock [1]
in a scenario:
- downstream task hasn't yet started
- upstream task requests overdraft and uses all available memory segments
from the global pool
- upstream task is blocked, because downstream task hasn't started yet and
can not consume any data
- downstream task tries to start, but can not, as there are no available
buffers

BTW, for watermark, the number of buffers it needs is
numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
the watermark won't block in requestMemory.
and

the best overdraft size will be equal to parallelism.
That's a lot of buffers. I don't think we need that many for broadcasting
watermarks. Watermarks are small, and remember that every subpartition has
some partially filled/empty WIP buffer, so the vast majority of
subpartitions will not need to request a new buffer.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-13203

wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> napisał(a):

Hi,


  >> Do you mean to ignore it while processing records, but keep using
`maxBuffersPerChannel` when calculating the availability of the output?


Yes, it is correct.


  >> Would it be a big issue if we changed it to check if at least
"overdraft number of buffers are available", where "overdraft number" is
configurable, instead of the currently hardcoded value of "1"?


These things resolve the different problems (at least as I see that).
The current hardcoded "1"  says that we switch "availability" to
"unavailability" when one more buffer is left(actually a little less
than one buffer since we write the last piece of data to this last
buffer). The overdraft feature doesn't change this logic we still want
to switch to "unavailability" in such a way but if we are already in
"unavailability" and we want more buffers then we can take "overdraft
number" more. So we can not avoid this hardcoded "1" since we need to
understand when we should switch to "unavailability"


-- About "reserve" vs "overdraft"

As Fanrui mentioned above, perhaps, the best overdraft size will be
equal to parallelism. Also, the user can set any value he wants. So even
if parallelism is small(~5) but the user's flatmap produces a lot of
data, the user can set 10 or even more. Which almost double the max
buffers and it will be impossible to reserve. At least we need to figure
out how to protect from such cases (the limit for an overdraft?). So
actually it looks even more difficult than increasing the maximum buffers.

I want to emphasize that overdraft buffers are soft configuration which
means it takes as many buffers as the global buffers pool has
available(maybe zero) but less than this configured value. It is also
important to notice that perhaps, not many subtasks in TaskManager will
be using this feature so we don't actually need a lot of available
buffers for every subtask(Here, I mean that if we have only one
window/flatmap operator and many other operators, then one TaskManager
will have many ordinary subtasks which don't actually need overdraft and
several subtasks that needs this feature). But in case of reservation,
we will reserve some buffers for all operators even if they don't really
need it.


-- Legacy source problem

If we still want to change max buffers then it is problem for
LegacySources(since every subtask of source will always use these
overdraft). But right now, I think that we can force to set 0 overdraft
buffers for legacy subtasks in configuration during execution(if it is
not too late for changing configuration in this place).


03.05.2022 14:11, rui fan пишет:
Hi

Thanks for Martijn Visser and Piotrek's feedback.  I agree with
ignoring the legacy source, it will affect our design. User should
use the new Source Api as much as possible.

Hi Piotrek, we may still need to discuss whether the
overdraft/reserve/spare should use extra buffers or buffers
in (exclusive + floating buffers)? They have some differences.

If it uses extra buffers:
1.The LocalBufferPool will be available when (usedBuffers + 1
   <= currentPoolSize) and all subpartitions don't reach the
maxBuffersPerChannel.

If it uses the buffers in (exclusive + floating buffers):
1. The LocalBufferPool will be available when (usedBuffers +
overdraftBuffers <= currentPoolSize) and all subpartitions
don't reach the maxBuffersPerChannel.
2. For low parallelism jobs, if overdraftBuffers is large(>8), the
usedBuffers will be small. That is the LocalBufferPool will be
easily unavailable. For throughput, if users turn up the
overdraft buffers, they need to turn up exclusive or floating
buffers. It also affects the InputChannel, and it's is unfriendly
to users.

So I prefer the overdraft to use extra buffers.


BTW, for watermark, the number of buffers it needs is
numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions,
the watermark won't block in requestMemory. But it has
2 problems:
1. It needs more overdraft buffers. If the overdraft uses
(exclusive + floating buffers),  there will be fewer buffers
available. Throughput may be affected.
2. The numberOfSubpartitions is different for each Task.
So if users want to cover watermark using this feature,
they don't know how to set the overdraftBuffers more r
easonably. And if the parallelism is changed, users still
need to change overdraftBuffers. It is unfriendly to users.

So I propose we support overdraftBuffers=-1, It means
we will automatically set overdraftBuffers=numberOfSubpartitions
in the Constructor of LocalBufferPool.

Please correct me if I'm wrong.

Thanks
fanrui

On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <pnowoj...@apache.org>
wrote:
Hi fanrui,

Do you mean don't add the extra buffers? We just use (exclusive
buffers *
parallelism + floating buffers)? The LocalBufferPool will be available
when
(usedBuffers+overdraftBuffers <=
exclusiveBuffers*parallelism+floatingBuffers)
and all subpartitions don't reach the maxBuffersPerChannel, right?
I'm not sure. Definitely we would need to adjust the minimum number of
the
required buffers, just as we did when we were implementing the non
blocking
outputs and adding availability logic to LocalBufferPool. Back then we
added "+ 1" to the minimum number of buffers. Currently this logic is
located NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:

int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions + 1;
For performance reasons, we always require at least one buffer per
sub-partition. Otherwise performance falls drastically. Now if we
require 5
overdraft buffers for output to be available, we need to have them on
top
of those "one buffer per sub-partition". So the logic should be changed
to:
int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions +
numOverdraftBuffers;

Regarding increasing the number of max buffers I'm not sure. As long as
"overdraft << max number of buffers", because all buffers on the outputs
are shared across all sub-partitions. If we have 5 overdraft buffers,
and
parallelism of 100, it doesn't matter in the grand scheme of things if
we
make the output available if at least one single buffer is available or
at
least 5 buffers are available out of ~200 (100 * 2 + 8). So effects of
increasing the overdraft from 1 to for example 5 should be negligible.
For
small parallelism, like 5, increasing overdraft from 1 to 5 still
increases
the overdraft by only about 25%. So maybe we can keep the max as it is?

If so, maybe we should change the name from "overdraft" to "buffer
reserve"
or "spare buffers"? And document it as "number of buffers kept in
reserve
in case of flatMap/firing timers/huge records"?

What do you think Fenrui, Anton?

Re LegacySources. I agree we can kind of ignore them in the new
features,
as long as we don't brake the existing deployments too much.

Best,
Piotrek

wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com>
napisał(a):
Hi everyone,

Just wanted to chip in on the discussion of legacy sources: IMHO, we
should
not focus too much on improving/adding capabilities for legacy sources.
We
want to persuade and push users to use the new Source API. Yes, this
means
that there's work required by the end users to port any custom source
to
the new interface. The benefits of the new Source API should outweigh
this.
Anything that we build to support multiple interfaces means adding more
complexity and more possibilities for bugs. Let's try to make our
lives a
little bit easier.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> wrote:

Hi Piotrek

Do you mean to ignore it while processing records, but keep using
`maxBuffersPerChannel` when calculating the availability of the
output?
I think yes, and please Anton Kalashnikov to help double check.

+1 for just having this as a separate configuration. Is it a big
problem
that legacy sources would be ignoring it? Note that we already have
effectively hardcoded a single overdraft buffer.
`LocalBufferPool#checkAvailability` checks if there is a single
buffer
available and this works the same for all tasks (including legacy
source
tasks). Would it be a big issue if we changed it to check if at least
"overdraft number of buffers are available", where "overdraft number"
is
configurable, instead of the currently hardcoded value of "1"?
Do you mean don't add the extra buffers? We just use (exclusive
buffers *
parallelism + floating buffers)? The LocalBufferPool will be available
when
(usedBuffers+overdraftBuffers <=
exclusiveBuffers*parallelism+floatingBuffers)
and all subpartitions don't reach the maxBuffersPerChannel, right?

If yes, I think it can solve the problem of legacy source. There may
be
some impact. If overdraftBuffers is large and only one buffer is used
to
process a single record, exclusive buffers*parallelism + floating
buffers
cannot be used. It may only be possible to use (exclusive buffers *
parallelism
+ floating buffers - overdraft buffers + 1). For throughput, if turn
up
the
overdraft buffers, the flink user needs to turn up exclusive or
floating
buffers. And it also affects the InputChannel.

If not, I don't think it can solve the problem of legacy source. The
legacy
source don't check isAvailable, If there are the extra buffers, legacy
source
will use them up until block in requestMemory.


Thanks
fanrui

On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <pnowoj...@apache.org>
wrote:

Hi,

+1 for the general proposal from my side. It would be a nice
workaround
flatMaps, WindowOperators and large records issues with unaligned
checkpoints.

The first task is about ignoring max buffers per channel. This
means
if
we request a memory segment from LocalBufferPool and the
maxBuffersPerChannel is reached for this channel, we just ignore
that
and continue to allocate buffer while LocalBufferPool has it(it is
actually not a overdraft).
Do you mean to ignore it while processing records, but keep using
`maxBuffersPerChannel` when calculating the availability of the
output?
The second task is about the real overdraft. I am pretty convinced
now
that we, unfortunately, need configuration for limitation of
overdraft
number(because it is not ok if one subtask allocates all buffers of
one
TaskManager considering that several different jobs can be
submitted
on
this TaskManager). So idea is to have
maxOverdraftBuffersPerPartition(technically to say per
LocalBufferPool).
In this case, when a limit of buffers in LocalBufferPool is
reached,
LocalBufferPool can request additionally from NetworkBufferPool up
to
maxOverdraftBuffersPerPartition buffers.
+1 for just having this as a separate configuration. Is it a big
problem
that legacy sources would be ignoring it? Note that we already have
effectively hardcoded a single overdraft buffer.
`LocalBufferPool#checkAvailability` checks if there is a single
buffer
available and this works the same for all tasks (including legacy
source
tasks). Would it be a big issue if we changed it to check if at least
"overdraft number of buffers are available", where "overdraft number"
is
configurable, instead of the currently hardcoded value of "1"?

Best,
Piotrek

pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> napisał(a):

Let me add some information about the LegacySource.

If we want to disable the overdraft buffer for LegacySource.
Could we add the enableOverdraft in LocalBufferPool?
The default value is false. If the getAvailableFuture is called,
change enableOverdraft=true. It indicates whether there are
checks isAvailable elsewhere.

I don't think it is elegant, but it's safe. Please correct me if
I'm
wrong.
Thanks
fanrui

On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com>
wrote:
Hi,

Thanks for your quick response.

For question 1/2/3, we think they are clear. We just need to
discuss
the
default value in PR.

For the legacy source, you are right. It's difficult for general
implementation.
Currently, we implement ensureRecordWriterIsAvailable() in
SourceFunction.SourceContext. And call it in our common
LegacySource,
e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume
kafka,
so
fixing FlinkKafkaConsumer solved most of our problems.

Core code:
```
public void ensureRecordWriterIsAvailable() {
       if (recordWriter == null
            ||

!configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
false)
            || recordWriter.isAvailable()) {
            return;
       }

       CompletableFuture<?> resumeFuture =
recordWriter.getAvailableFuture();
       try {
            resumeFuture.get();
       } catch (Throwable ignored) {
       }
}
```

LegacySource calls sourceContext.ensureRecordWriterIsAvailable()
before synchronized (checkpointLock) and collects records.
Please let me know if there is a better solution.

Thanks
fanrui

On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
kaa....@yandex.com>
wrote:

Hi.

-- 1. Do you mean split this into two JIRAs or two PRs or two
commits
in a
      PR?

Perhaps, the separated ticket will be better since this task has
fewer
questions but we should find a solution for LegacySource first.

--  2. For the first task, if the flink user disables the
Unaligned
      Checkpoint, do we ignore max buffers per channel? Because
the
overdraft
      isn't useful for the Aligned Checkpoint, it still needs to
wait
for
      downstream Task to consume.

I think that the logic should be the same for AC and UC. As I
understand,
the overdraft maybe is not really helpful for AC but it doesn't
make
it
worse as well.

    3. For the second task
--      - The default value of maxOverdraftBuffersPerPartition
may
also
need
         to be discussed.

I think it should be a pretty small value or even 0 since it
kind
of
optimization and user should understand what they do(especially
if
we
implement the first task).

--      - If the user disables the Unaligned Checkpoint, can we
set
the
         maxOverdraftBuffersPerPartition=0? Because the overdraft
isn't
useful for
         the Aligned Checkpoint.

The same answer that above, if the overdraft doesn't make
degradation
for
the Aligned Checkpoint I don't think that we should make
difference
between
AC and UC.

      4. For the legacy source
--      - If enabling the Unaligned Checkpoint, it uses up to
         maxOverdraftBuffersPerPartition buffers.
         - If disabling the UC, it doesn't use the overdraft
buffer.
         - Do you think it's ok?

Ideally, I don't want to use overdraft for LegacySource at all
since
it
can lead to undesirable results especially if the limit is high.
At
least,
as I understand, it will always work in overdraft mode and it
will
borrow
maxOverdraftBuffersPerPartition buffers from the global pool
which
can
lead
to degradation of other subtasks on the same TaskManager.

--      - Actually, we added the checkAvailable logic for
LegacySource
in
our
         internal version. It works well.

I don't really understand how it is possible for general case
considering
that each user has their own implementation of
LegacySourceOperator
--   5. For the benchmark, do you have any suggestions? I
submitted
the
PR
      [1].

I haven't looked at it yet, but I'll try to do it soon.


29.04.2022 14:14, rui fan пишет:
Hi,

Thanks for your feedback. I have a servel of questions.

      1. Do you mean split this into two JIRAs or two PRs or two
commits
in a
      PR?
      2. For the first task, if the flink user disables the
Unaligned
      Checkpoint, do we ignore max buffers per channel? Because
the
overdraft
      isn't useful for the Aligned Checkpoint, it still needs to
wait
for
      downstream Task to consume.
      3. For the second task
         - The default value of maxOverdraftBuffersPerPartition
may
also
need
         to be discussed.
         - If the user disables the Unaligned Checkpoint, can we
set
the
         maxOverdraftBuffersPerPartition=0? Because the
overdraft
isn't
useful for
         the Aligned Checkpoint.
      4. For the legacy source
         - If enabling the Unaligned Checkpoint, it uses up to
         maxOverdraftBuffersPerPartition buffers.
         - If disabling the UC, it doesn't use the overdraft
buffer.
         - Do you think it's ok?
         - Actually, we added the checkAvailable logic for
LegacySource
in our
         internal version. It works well.
      5. For the benchmark, do you have any suggestions? I
submitted
the
PR
      [1].

[1] https://github.com/apache/flink-benchmarks/pull/54

Thanks
fanrui

On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
kaa....@yandex.com
wrote:

Hi,

We discuss about it a little with Dawid Wysakowicz. Here is
some
conclusion:

First of all, let's split this into two tasks.

The first task is about ignoring max buffers per channel.
This
means
if
we request a memory segment from LocalBufferPool and the
maxBuffersPerChannel is reached for this channel, we just
ignore
that
and continue to allocate buffer while LocalBufferPool has
it(it
is
actually not a overdraft).

The second task is about the real overdraft. I am pretty
convinced
now
that we, unfortunately, need configuration for limitation of
overdraft
number(because it is not ok if one subtask allocates all
buffers
of
one
TaskManager considering that several different jobs can be
submitted
on
this TaskManager). So idea is to have
maxOverdraftBuffersPerPartition(technically to say per
LocalBufferPool).
In this case, when a limit of buffers in LocalBufferPool is
reached,
LocalBufferPool can request additionally from
NetworkBufferPool
up
to
maxOverdraftBuffersPerPartition buffers.


But it is still not clear how to handle LegacySource since it
actually
works as unlimited flatmap and it will always work in
overdraft
mode
which is not a target. So we still need to think about that.


     29.04.2022 11:11, rui fan пишет:
Hi Anton Kalashnikov,

I think you agree with we should limit the maximum number of
overdraft
segments that each LocalBufferPool can apply for, right?

I prefer to hard code the maxOverdraftBuffers due to don't
add
the
new
configuration. And I hope to hear more from the community.

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:39 PM rui fan <
1996fan...@gmail.com>
wrote:
Hi Anton Kalashnikov,

Thanks for your very clear reply, I think you are totally
right.
The 'maxBuffersNumber - buffersInUseNumber' can be used as
the
overdraft buffer, it won't need the new buffer
configuration.Flink
users
can turn up the maxBuffersNumber to control the overdraft
buffer
size.
Also, I‘d like to add some information. For safety, we
should
limit
the
maximum number of overdraft segments that each
LocalBufferPool
can apply for.

Why do we limit it?
Some operators don't check the `recordWriter.isAvailable`
during
processing records, such as LegacySource. I have mentioned
it
in
FLINK-26759 [1]. I'm not sure if there are other cases.

If don't add the limitation, the LegacySource will use up
all
remaining
memory in the NetworkBufferPool when the backpressure is
severe.
How to limit it?
I prefer to hard code the
`maxOverdraftBuffers=numberOfSubpartitions`
in the constructor of LocalBufferPool. The
maxOverdraftBuffers
is
just
for safety, and it should be enough for most flink jobs. Or
we
can
set
`maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)`
to
handle
some jobs of low parallelism.

Also if user don't enable the Unaligned Checkpoint, we can
set
maxOverdraftBuffers=0 in the constructor of
LocalBufferPool.
Because
the overdraft isn't useful for the Aligned Checkpoint.

Please correct me if I'm wrong. Thanks a lot.

[1] https://issues.apache.org/jira/browse/FLINK-26759

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <
kaa....@yandex.com>
wrote:

Hi fanrui,

Thanks for creating the FLIP.

In general, I think the overdraft is good idea and it
should
help
in
described above cases. Here are my thoughts about
configuration:
Please, correct me if I am wrong but as I understand right
now
we
have
following calculation.

maxBuffersNumber(per TaskManager) = Network
memory(calculated
via
taskmanager.memory.network.fraction,
taskmanager.memory.network.min,
taskmanager.memory.network.max and total memory size) /
taskmanager.memory.segment-size.

requiredBuffersNumber(per TaskManager) = (exclusive
buffers
*
parallelism + floating buffers) * subtasks number in
TaskManager
buffersInUseNumber = real number of buffers which used at
current
moment(always <= requiredBuffersNumber)

Ideally requiredBuffersNumber should be equal to
maxBuffersNumber
which
allows Flink work predictibly. But if
requiredBuffersNumber
maxBuffersNumber sometimes it is also fine(but not good)
since
not
all
required buffers really mandatory(e.g. it is ok if Flink
can
not
allocate floating buffers)

But if maxBuffersNumber > requiredBuffersNumber, as I
understand
Flink
just never use these leftovers buffers(maxBuffersNumber -
requiredBuffersNumber). Which I propose to use. ( we can
actualy
use
even difference 'requiredBuffersNumber -
buffersInUseNumber'
since
if
one TaskManager contains several operators including
'window'
which
can
temporally borrow buffers from the global pool).

My proposal, more specificaly(it relates only to
requesting
buffers
during processing single record while switching to
unavalability
between
records should be the same as we have it now):

* If one more buffer requested but maxBuffersPerChannel
reached,
then
just ignore this limitation and allocate this buffers from
any
place(from LocalBufferPool if it has something yet
otherwise
from
NetworkBufferPool)

* If LocalBufferPool exceeds limit, then temporally
allocate
it
from
NetworkBufferPool while it has something to allocate


Maybe I missed something and this solution won't work,
but I
like
it
since on the one hand, it work from the scratch without
any
configuration, on the other hand, it can be configuration
by
changing
proportion of maxBuffersNumber and requiredBuffersNumber.

The last thing that I want to say, I don't really want to
implement
new
configuration since even now it is not clear how to
correctly
configure
network buffers with existing configuration and I don't
want
to
complicate it, especially if it will be possible to
resolve
the
problem
automatically(as described above).


So is my understanding about network memory/buffers
correct?
--

Best regards,
Anton Kalashnikov

27.04.2022 07:46, rui fan пишет:
Hi everyone,

Unaligned Checkpoint (FLIP-76 [1]) is a major feature of
Flink.
It
effectively solves the problem of checkpoint timeout or
slow
checkpoint when backpressure is severe.

We found that UC(Unaligned Checkpoint) does not work well
when
the
back pressure is severe and multiple output buffers are
required
to
process a single record. FLINK-14396 [2] also mentioned
this
issue
before. So we propose the overdraft buffer to solve it.

I created FLINK-26762[3] and FLIP-227[4] to detail the
overdraft
buffer mechanism. After discussing with Anton
Kalashnikov,
there
are
still some points to discuss:

      * There are already a lot of buffer-related
configurations.
Do
we
        need to add a new configuration for the overdraft
buffer?
      * Where should the overdraft buffer use memory?
      * If the overdraft-buffer uses the memory remaining
in
the
        NetworkBufferPool, no new configuration needs to be
added.
      * If adding a new configuration:
          o Should we set the overdraft-memory-size at the
TM
level
or
the
            Task level?
          o Or set overdraft-buffers to indicate the number
of
            memory-segments that can be overdrawn.
          o What is the default value? How to set sensible
defaults?
Currently, I implemented a POC [5] and verified it using
flink-benchmarks [6]. The POC sets overdraft-buffers at
Task
level,
and default value is 10. That is: each LocalBufferPool
can
overdraw up
to 10 memory-segments.

Looking forward to your feedback!

Thanks,
fanrui

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
[2] https://issues.apache.org/jira/browse/FLINK-14396
[3] https://issues.apache.org/jira/browse/FLINK-26762
[4]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
[5]

https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
[6] https://github.com/apache/flink-benchmarks/pull/54
--

Best regards,
Anton Kalashnikov


--

Best regards,
Anton Kalashnikov


--

Best regards,
Anton Kalashnikov


Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to