Hi,
Thanks a lot for your discussion.
After several discussions, I think it's clear now. I updated the
"Proposed Changes" of FLIP-227[1]. If I have something
missing, please help to add it to FLIP, or add it in the mail
and I can add it to FLIP. If everything is OK, I will create a
new JIRA for the first task, and use FLINK-26762[2] as the
second task.
About the legacy source, do we set maxOverdraftBuffersPerGate=0
directly? How to identify legacySource? Or could we add
the overdraftEnabled in LocalBufferPool? The default value
is false. If the getAvailableFuture is called, change
overdraftEnabled=true.
It indicates whether there are checks isAvailable elsewhere.
It might be more general, it can cover more cases.
Also, I think the default value of 'max-overdraft-buffers-per-gate'
needs to be confirmed. I prefer it to be between 5 and 10. How
do you think?
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
[2] https://issues.apache.org/jira/browse/FLINK-26762
Thanks
fanrui
On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org>
wrote:
Hi again,
After sleeping over this, if both versions (reserve and overdraft)
have the same complexity, I would also prefer the overdraft.
> `Integer.MAX_VALUE` as default value was my idea as well but now, as
> Dawid mentioned, I think it is dangerous since it is too
implicit for
> the user and if the user submits one more job for the same
TaskManger
As I mentioned, it's not only an issue with multiple jobs. The
same problem can happen with different subtasks from the same job,
potentially leading to the FLINK-13203 deadlock [1]. With
FLINK-13203 fixed, I would be in favour of Integer.MAX_VALUE to be
the default value, but as it is, I think we should indeed play on
the safe side and limit it.
> I still don't understand how should be limited "reserve"
implementation.
> I mean if we have X buffers in total and the user sets overdraft
equal
> to X we obviously can not reserve all buffers, but how many we are
> allowed to reserve? Should it be a different configuration like
> percentegeForReservedBuffers?
The reserve could be defined as percentage, or as a fixed number
of buffers. But yes. In normal operation subtask would not use the
reserve, as if numberOfAvailableBuffers < reserve, the output
would be not available. Only in the flatMap/timers/huge records
case the reserve could be used.
> 1. If the total buffers of LocalBufferPool <= the reserve
buffers, will LocalBufferPool never be available? Can't process data?
Of course we would need to make sure that never happens. So the
reserve should be < total buffer size.
> 2. If the overdraft buffer use the extra buffers, when the
downstream
> task inputBuffer is insufficient, it should fail to start the
job, and then
> restart? When the InputBuffer is initialized, it will apply for
enough
> buffers, right?
The failover if downstream can not allocate buffers is already
implemented FLINK-14872 [2]. There is a timeout for how long the
task is waiting for buffer allocation. However this doesn't
prevent many (potentially infinitely many) deadlock/restarts
cycles. IMO the propper solution for [1] would be 2b described in
the ticket:
> 2b. Assign extra buffers only once all of the tasks are RUNNING.
This is a simplified version of 2a, without tracking the tasks
sink-to-source.
But that's a pre-existing problem and I don't think we have to
solve it before implementing overdraft. I think we would need to
solve it only before setting Integer.MAX_VALUE as the default for
the overdraft. Maybe I would hesitate setting the overdraft to
anything more then a couple of buffers by default for the same reason.
> Actually, I totally agree that we don't need a lot of buffers
for overdraft
and
> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
> When we finish this feature and after users use it, if users
feedback
> this issue we can discuss again.
+1
Piotrek
[1] https://issues.apache.org/jira/browse/FLINK-13203
[2] https://issues.apache.org/jira/browse/FLINK-14872
czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):
Hi everyone,
I still have some questions.
1. If the total buffers of LocalBufferPool <= the reserve
buffers, will
LocalBufferPool never be available? Can't process data?
2. If the overdraft buffer use the extra buffers, when the
downstream
task inputBuffer is insufficient, it should fail to start the
job, and then
restart? When the InputBuffer is initialized, it will apply
for enough
buffers, right?
Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
When we finish this feature and after users use it, if users
feedback
this issue we can discuss again.
Thanks
fanrui
On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz
<dwysakow...@apache.org> wrote:
Hey all,
I have not replied in the thread yet, but I was following
the discussion.
Personally, I like Fanrui's and Anton's idea. As far as I
understand it
the idea to distinguish between inside flatMap & outside
would be fairly
simple, but maybe slightly indirect. The checkAvailability
would remain
unchanged and it is checked always between separate
invocations of the
UDF. Therefore the overdraft buffers would not apply
there. However once
the pool says it is available, it means it has at least an
initial
buffer. So any additional request without checking for
availability can
be considered to be inside of processing a single record.
This does not
hold just for the LegacySource as I don't think it
actually checks for
the availability of buffers in the LocalBufferPool.
In the offline chat with Anton, we also discussed if we
need a limit of
the number of buffers we could overdraft (or in other
words if the limit
should be equal to Integer.MAX_VALUE), but personally I'd
prefer to stay
on the safe side and have it limited. The pool of network
buffers is
shared for the entire TaskManager, so it means it can be
shared even
across tasks of separate jobs. However, I might be just
unnecessarily
cautious here.
Best,
Dawid
On 04/05/2022 10:54, Piotr Nowojski wrote:
> Hi,
>
> Thanks for the answers.
>
>> we may still need to discuss whether the
>> overdraft/reserve/spare should use extra buffers or buffers
>> in (exclusive + floating buffers)?
> and
>
>> These things resolve the different problems (at least
as I see that).
>> The current hardcoded "1" says that we switch
"availability" to
>> "unavailability" when one more buffer is left(actually
a little less
>> than one buffer since we write the last piece of data
to this last
>> buffer). The overdraft feature doesn't change this
logic we still want
>> to switch to "unavailability" in such a way but if we
are already in
>> "unavailability" and we want more buffers then we can
take "overdraft
>> number" more. So we can not avoid this hardcoded "1"
since we need to
>> understand when we should switch to "unavailability"
> Ok, I see. So it seems to me that both of you have in
mind to keep the
> buffer pools as they are right now, but if we are in the
middle of
> processing a record, we can request extra overdraft
buffers on top of
> those? This is another way to implement the overdraft to
what I was
> thinking. I was thinking about something like keeping
the "overdraft" or
> more precisely buffer "reserve" in the buffer pool. I
think my version
> would be easier to implement, because it is just
fiddling with min/max
> buffers calculation and slightly modified
`checkAvailability()` logic.
>
> On the other hand what you have in mind would better
utilise the available
> memory, right? It would require more code changes (how
would we know when
> we are allowed to request the overdraft?). However, in
this case, I would
> be tempted to set the number of overdraft buffers by
default to
> `Integer.MAX_VALUE`, and let the system request as many
buffers as
> necessary. The only downside that I can think of (apart
of higher
> complexity) would be higher chance of hitting a
known/unsolved deadlock [1]
> in a scenario:
> - downstream task hasn't yet started
> - upstream task requests overdraft and uses all
available memory segments
> from the global pool
> - upstream task is blocked, because downstream task
hasn't started yet and
> can not consume any data
> - downstream task tries to start, but can not, as there
are no available
> buffers
>
>> BTW, for watermark, the number of buffers it needs is
>> numberOfSubpartitions. So if
overdraftBuffers=numberOfSubpartitions,
>> the watermark won't block in requestMemory.
> and
>
>> the best overdraft size will be equal to parallelism.
> That's a lot of buffers. I don't think we need that many
for broadcasting
> watermarks. Watermarks are small, and remember that
every subpartition has
> some partially filled/empty WIP buffer, so the vast
majority of
> subpartitions will not need to request a new buffer.
>
> Best,
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-13203
>
> wt., 3 maj 2022 o 17:15 Anton Kalashnikov
<kaa....@yandex.com> napisał(a):
>
>> Hi,
>>
>>
>> >> Do you mean to ignore it while processing records,
but keep using
>> `maxBuffersPerChannel` when calculating the
availability of the output?
>>
>>
>> Yes, it is correct.
>>
>>
>> >> Would it be a big issue if we changed it to check
if at least
>> "overdraft number of buffers are available", where
"overdraft number" is
>> configurable, instead of the currently hardcoded value
of "1"?
>>
>>
>> These things resolve the different problems (at least
as I see that).
>> The current hardcoded "1" says that we switch
"availability" to
>> "unavailability" when one more buffer is left(actually
a little less
>> than one buffer since we write the last piece of data
to this last
>> buffer). The overdraft feature doesn't change this
logic we still want
>> to switch to "unavailability" in such a way but if we
are already in
>> "unavailability" and we want more buffers then we can
take "overdraft
>> number" more. So we can not avoid this hardcoded "1"
since we need to
>> understand when we should switch to "unavailability"
>>
>>
>> -- About "reserve" vs "overdraft"
>>
>> As Fanrui mentioned above, perhaps, the best overdraft
size will be
>> equal to parallelism. Also, the user can set any value
he wants. So even
>> if parallelism is small(~5) but the user's flatmap
produces a lot of
>> data, the user can set 10 or even more. Which almost
double the max
>> buffers and it will be impossible to reserve. At least
we need to figure
>> out how to protect from such cases (the limit for an
overdraft?). So
>> actually it looks even more difficult than increasing
the maximum buffers.
>>
>> I want to emphasize that overdraft buffers are soft
configuration which
>> means it takes as many buffers as the global buffers
pool has
>> available(maybe zero) but less than this configured
value. It is also
>> important to notice that perhaps, not many subtasks in
TaskManager will
>> be using this feature so we don't actually need a lot
of available
>> buffers for every subtask(Here, I mean that if we have
only one
>> window/flatmap operator and many other operators, then
one TaskManager
>> will have many ordinary subtasks which don't actually
need overdraft and
>> several subtasks that needs this feature). But in case
of reservation,
>> we will reserve some buffers for all operators even if
they don't really
>> need it.
>>
>>
>> -- Legacy source problem
>>
>> If we still want to change max buffers then it is
problem for
>> LegacySources(since every subtask of source will always
use these
>> overdraft). But right now, I think that we can force to
set 0 overdraft
>> buffers for legacy subtasks in configuration during
execution(if it is
>> not too late for changing configuration in this place).
>>
>>
>> 03.05.2022 14:11, rui fan пишет:
>>> Hi
>>>
>>> Thanks for Martijn Visser and Piotrek's feedback. I
agree with
>>> ignoring the legacy source, it will affect our design.
User should
>>> use the new Source Api as much as possible.
>>>
>>> Hi Piotrek, we may still need to discuss whether the
>>> overdraft/reserve/spare should use extra buffers or
buffers
>>> in (exclusive + floating buffers)? They have some
differences.
>>>
>>> If it uses extra buffers:
>>> 1.The LocalBufferPool will be available when
(usedBuffers + 1
>>> <= currentPoolSize) and all subpartitions don't
reach the
>>> maxBuffersPerChannel.
>>>
>>> If it uses the buffers in (exclusive + floating buffers):
>>> 1. The LocalBufferPool will be available when
(usedBuffers +
>>> overdraftBuffers <= currentPoolSize) and all subpartitions
>>> don't reach the maxBuffersPerChannel.
>>> 2. For low parallelism jobs, if overdraftBuffers is
large(>8), the
>>> usedBuffers will be small. That is the LocalBufferPool
will be
>>> easily unavailable. For throughput, if users turn up the
>>> overdraft buffers, they need to turn up exclusive or
floating
>>> buffers. It also affects the InputChannel, and it's is
unfriendly
>>> to users.
>>>
>>> So I prefer the overdraft to use extra buffers.
>>>
>>>
>>> BTW, for watermark, the number of buffers it needs is
>>> numberOfSubpartitions. So if
overdraftBuffers=numberOfSubpartitions,
>>> the watermark won't block in requestMemory. But it has
>>> 2 problems:
>>> 1. It needs more overdraft buffers. If the overdraft uses
>>> (exclusive + floating buffers), there will be fewer
buffers
>>> available. Throughput may be affected.
>>> 2. The numberOfSubpartitions is different for each Task.
>>> So if users want to cover watermark using this feature,
>>> they don't know how to set the overdraftBuffers more r
>>> easonably. And if the parallelism is changed, users still
>>> need to change overdraftBuffers. It is unfriendly to
users.
>>>
>>> So I propose we support overdraftBuffers=-1, It means
>>> we will automatically set
overdraftBuffers=numberOfSubpartitions
>>> in the Constructor of LocalBufferPool.
>>>
>>> Please correct me if I'm wrong.
>>>
>>> Thanks
>>> fanrui
>>>
>>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski
<pnowoj...@apache.org>
>> wrote:
>>>> Hi fanrui,
>>>>
>>>>> Do you mean don't add the extra buffers? We just use
(exclusive
>> buffers *
>>>>> parallelism + floating buffers)? The LocalBufferPool
will be available
>>>> when
>>>>> (usedBuffers+overdraftBuffers <=
>>>> exclusiveBuffers*parallelism+floatingBuffers)
>>>>> and all subpartitions don't reach the
maxBuffersPerChannel, right?
>>>> I'm not sure. Definitely we would need to adjust the
minimum number of
>> the
>>>> required buffers, just as we did when we were
implementing the non
>> blocking
>>>> outputs and adding availability logic to
LocalBufferPool. Back then we
>>>> added "+ 1" to the minimum number of buffers.
Currently this logic is
>>>> located
NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
>>>>
>>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
numSubpartitions + 1;
>>>> For performance reasons, we always require at least
one buffer per
>>>> sub-partition. Otherwise performance falls
drastically. Now if we
>> require 5
>>>> overdraft buffers for output to be available, we need
to have them on
>> top
>>>> of those "one buffer per sub-partition". So the logic
should be changed
>> to:
>>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
numSubpartitions +
>>>> numOverdraftBuffers;
>>>>
>>>> Regarding increasing the number of max buffers I'm
not sure. As long as
>>>> "overdraft << max number of buffers", because all
buffers on the outputs
>>>> are shared across all sub-partitions. If we have 5
overdraft buffers,
>> and
>>>> parallelism of 100, it doesn't matter in the grand
scheme of things if
>> we
>>>> make the output available if at least one single
buffer is available or
>> at
>>>> least 5 buffers are available out of ~200 (100 * 2 +
8). So effects of
>>>> increasing the overdraft from 1 to for example 5
should be negligible.
>> For
>>>> small parallelism, like 5, increasing overdraft from
1 to 5 still
>> increases
>>>> the overdraft by only about 25%. So maybe we can keep
the max as it is?
>>>>
>>>> If so, maybe we should change the name from
"overdraft" to "buffer
>> reserve"
>>>> or "spare buffers"? And document it as "number of
buffers kept in
>> reserve
>>>> in case of flatMap/firing timers/huge records"?
>>>>
>>>> What do you think Fenrui, Anton?
>>>>
>>>> Re LegacySources. I agree we can kind of ignore them
in the new
>> features,
>>>> as long as we don't brake the existing deployments
too much.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> wt., 3 maj 2022 o 09:20 Martijn Visser
<mart...@ververica.com>
>> napisał(a):
>>>>> Hi everyone,
>>>>>
>>>>> Just wanted to chip in on the discussion of legacy
sources: IMHO, we
>>>> should
>>>>> not focus too much on improving/adding capabilities
for legacy sources.
>>>> We
>>>>> want to persuade and push users to use the new
Source API. Yes, this
>>>> means
>>>>> that there's work required by the end users to port
any custom source
>> to
>>>>> the new interface. The benefits of the new Source
API should outweigh
>>>> this.
>>>>> Anything that we build to support multiple
interfaces means adding more
>>>>> complexity and more possibilities for bugs. Let's
try to make our
>> lives a
>>>>> little bit easier.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn Visser
>>>>> https://twitter.com/MartijnVisser82
>>>>> https://github.com/MartijnVisser
>>>>>
>>>>>
>>>>> On Tue, 3 May 2022 at 07:50, rui fan
<1996fan...@gmail.com> wrote:
>>>>>
>>>>>> Hi Piotrek
>>>>>>
>>>>>>> Do you mean to ignore it while processing records,
but keep using
>>>>>>> `maxBuffersPerChannel` when calculating the
availability of the
>>>> output?
>>>>>> I think yes, and please Anton Kalashnikov to help
double check.
>>>>>>
>>>>>>> +1 for just having this as a separate
configuration. Is it a big
>>>>> problem
>>>>>>> that legacy sources would be ignoring it? Note
that we already have
>>>>>>> effectively hardcoded a single overdraft buffer.
>>>>>>> `LocalBufferPool#checkAvailability` checks if
there is a single
>>>> buffer
>>>>>>> available and this works the same for all tasks
(including legacy
>>>>> source
>>>>>>> tasks). Would it be a big issue if we changed it
to check if at least
>>>>>>> "overdraft number of buffers are available", where
"overdraft number"
>>>>> is
>>>>>>> configurable, instead of the currently hardcoded
value of "1"?
>>>>>> Do you mean don't add the extra buffers? We just
use (exclusive
>>>> buffers *
>>>>>> parallelism + floating buffers)? The
LocalBufferPool will be available
>>>>> when
>>>>>> (usedBuffers+overdraftBuffers <=
>>>>>> exclusiveBuffers*parallelism+floatingBuffers)
>>>>>> and all subpartitions don't reach the
maxBuffersPerChannel, right?
>>>>>>
>>>>>> If yes, I think it can solve the problem of legacy
source. There may
>> be
>>>>>> some impact. If overdraftBuffers is large and only
one buffer is used
>>>> to
>>>>>> process a single record, exclusive
buffers*parallelism + floating
>>>> buffers
>>>>>> cannot be used. It may only be possible to use
(exclusive buffers *
>>>>>> parallelism
>>>>>> + floating buffers - overdraft buffers + 1). For
throughput, if turn
>> up
>>>>> the
>>>>>> overdraft buffers, the flink user needs to turn up
exclusive or
>>>> floating
>>>>>> buffers. And it also affects the InputChannel.
>>>>>>
>>>>>> If not, I don't think it can solve the problem of
legacy source. The
>>>>> legacy
>>>>>> source don't check isAvailable, If there are the
extra buffers, legacy
>>>>>> source
>>>>>> will use them up until block in requestMemory.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> fanrui
>>>>>>
>>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski
<pnowoj...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> +1 for the general proposal from my side. It would
be a nice
>>>> workaround
>>>>>>> flatMaps, WindowOperators and large records issues
with unaligned
>>>>>>> checkpoints.
>>>>>>>
>>>>>>>> The first task is about ignoring max buffers per
channel. This
>>>> means
>>>>> if
>>>>>>>> we request a memory segment from LocalBufferPool
and the
>>>>>>>> maxBuffersPerChannel is reached for this channel,
we just ignore
>>>> that
>>>>>>>> and continue to allocate buffer while
LocalBufferPool has it(it is
>>>>>>>> actually not a overdraft).
>>>>>>> Do you mean to ignore it while processing records,
but keep using
>>>>>>> `maxBuffersPerChannel` when calculating the
availability of the
>>>> output?
>>>>>>>> The second task is about the real overdraft. I am
pretty convinced
>>>>> now
>>>>>>>> that we, unfortunately, need configuration for
limitation of
>>>>> overdraft
>>>>>>>> number(because it is not ok if one subtask
allocates all buffers of
>>>>> one
>>>>>>>> TaskManager considering that several different
jobs can be
>>>> submitted
>>>>> on
>>>>>>>> this TaskManager). So idea is to have
>>>>>>>> maxOverdraftBuffersPerPartition(technically to
say per
>>>>>> LocalBufferPool).
>>>>>>>> In this case, when a limit of buffers in
LocalBufferPool is
>>>> reached,
>>>>>>>> LocalBufferPool can request additionally from
NetworkBufferPool up
>>>> to
>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>>>>>> +1 for just having this as a separate
configuration. Is it a big
>>>>> problem
>>>>>>> that legacy sources would be ignoring it? Note
that we already have
>>>>>>> effectively hardcoded a single overdraft buffer.
>>>>>>> `LocalBufferPool#checkAvailability` checks if
there is a single
>>>> buffer
>>>>>>> available and this works the same for all tasks
(including legacy
>>>>> source
>>>>>>> tasks). Would it be a big issue if we changed it
to check if at least
>>>>>>> "overdraft number of buffers are available", where
"overdraft number"
>>>>> is
>>>>>>> configurable, instead of the currently hardcoded
value of "1"?
>>>>>>>
>>>>>>> Best,
>>>>>>> Piotrek
>>>>>>>
>>>>>>> pt., 29 kwi 2022 o 17:04 rui fan
<1996fan...@gmail.com> napisał(a):
>>>>>>>
>>>>>>>> Let me add some information about the LegacySource.
>>>>>>>>
>>>>>>>> If we want to disable the overdraft buffer for
LegacySource.
>>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
>>>>>>>> The default value is false. If the
getAvailableFuture is called,
>>>>>>>> change enableOverdraft=true. It indicates whether
there are
>>>>>>>> checks isAvailable elsewhere.
>>>>>>>>
>>>>>>>> I don't think it is elegant, but it's safe.
Please correct me if
>>>> I'm
>>>>>>> wrong.
>>>>>>>> Thanks
>>>>>>>> fanrui
>>>>>>>>
>>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan
<1996fan...@gmail.com>
>>>>> wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Thanks for your quick response.
>>>>>>>>>
>>>>>>>>> For question 1/2/3, we think they are clear. We
just need to
>>>>> discuss
>>>>>>> the
>>>>>>>>> default value in PR.
>>>>>>>>>
>>>>>>>>> For the legacy source, you are right. It's
difficult for general
>>>>>>>>> implementation.
>>>>>>>>> Currently, we implement
ensureRecordWriterIsAvailable() in
>>>>>>>>> SourceFunction.SourceContext. And call it in our
common
>>>>> LegacySource,
>>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink
jobs consume
>>>> kafka,
>>>>> so
>>>>>>>>> fixing FlinkKafkaConsumer solved most of our
problems.
>>>>>>>>>
>>>>>>>>> Core code:
>>>>>>>>> ```
>>>>>>>>> public void ensureRecordWriterIsAvailable() {
>>>>>>>>> if (recordWriter == null
>>>>>>>>> ||
>>>>>>>>>
>>
!configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
>>>>>>>>> false)
>>>>>>>>> || recordWriter.isAvailable()) {
>>>>>>>>> return;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> CompletableFuture<?> resumeFuture =
>>>>>>>> recordWriter.getAvailableFuture();
>>>>>>>>> try {
>>>>>>>>> resumeFuture.get();
>>>>>>>>> } catch (Throwable ignored) {
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> LegacySource calls
sourceContext.ensureRecordWriterIsAvailable()
>>>>>>>>> before synchronized (checkpointLock) and
collects records.
>>>>>>>>> Please let me know if there is a better solution.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> fanrui
>>>>>>>>>
>>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
>>>>>> kaa....@yandex.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi.
>>>>>>>>>>
>>>>>>>>>> -- 1. Do you mean split this into two JIRAs or
two PRs or two
>>>>>> commits
>>>>>>>> in a
>>>>>>>>>> PR?
>>>>>>>>>>
>>>>>>>>>> Perhaps, the separated ticket will be better
since this task has
>>>>>> fewer
>>>>>>>>>> questions but we should find a solution for
LegacySource first.
>>>>>>>>>>
>>>>>>>>>> -- 2. For the first task, if the flink user
disables the
>>>>> Unaligned
>>>>>>>>>> Checkpoint, do we ignore max buffers per
channel? Because
>>>> the
>>>>>>>>>> overdraft
>>>>>>>>>> isn't useful for the Aligned Checkpoint,
it still needs to
>>>>> wait
>>>>>>> for
>>>>>>>>>> downstream Task to consume.
>>>>>>>>>>
>>>>>>>>>> I think that the logic should be the same for
AC and UC. As I
>>>>>>>> understand,
>>>>>>>>>> the overdraft maybe is not really helpful for
AC but it doesn't
>>>>> make
>>>>>>> it
>>>>>>>>>> worse as well.
>>>>>>>>>>
>>>>>>>>>> 3. For the second task
>>>>>>>>>> -- - The default value of
maxOverdraftBuffersPerPartition
>>>> may
>>>>>>> also
>>>>>>>>>> need
>>>>>>>>>> to be discussed.
>>>>>>>>>>
>>>>>>>>>> I think it should be a pretty small value or
even 0 since it
>>>> kind
>>>>> of
>>>>>>>>>> optimization and user should understand what
they do(especially
>>>> if
>>>>>> we
>>>>>>>>>> implement the first task).
>>>>>>>>>>
>>>>>>>>>> -- - If the user disables the Unaligned
Checkpoint, can we
>>>>> set
>>>>>>> the
>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the
overdraft
>>>>>> isn't
>>>>>>>>>> useful for
>>>>>>>>>> the Aligned Checkpoint.
>>>>>>>>>>
>>>>>>>>>> The same answer that above, if the overdraft
doesn't make
>>>>>> degradation
>>>>>>>> for
>>>>>>>>>> the Aligned Checkpoint I don't think that we
should make
>>>>> difference
>>>>>>>> between
>>>>>>>>>> AC and UC.
>>>>>>>>>>
>>>>>>>>>> 4. For the legacy source
>>>>>>>>>> -- - If enabling the Unaligned Checkpoint,
it uses up to
>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>>>>>>>>> - If disabling the UC, it doesn't use
the overdraft
>>>> buffer.
>>>>>>>>>> - Do you think it's ok?
>>>>>>>>>>
>>>>>>>>>> Ideally, I don't want to use overdraft for
LegacySource at all
>>>>> since
>>>>>>> it
>>>>>>>>>> can lead to undesirable results especially if
the limit is high.
>>>>> At
>>>>>>>> least,
>>>>>>>>>> as I understand, it will always work in
overdraft mode and it
>>>> will
>>>>>>>> borrow
>>>>>>>>>> maxOverdraftBuffersPerPartition buffers from
the global pool
>>>> which
>>>>>> can
>>>>>>>> lead
>>>>>>>>>> to degradation of other subtasks on the same
TaskManager.
>>>>>>>>>>
>>>>>>>>>> -- - Actually, we added the checkAvailable
logic for
>>>>>> LegacySource
>>>>>>>> in
>>>>>>>>>> our
>>>>>>>>>> internal version. It works well.
>>>>>>>>>>
>>>>>>>>>> I don't really understand how it is possible
for general case
>>>>>>>> considering
>>>>>>>>>> that each user has their own implementation of
>>>>> LegacySourceOperator
>>>>>>>>>> -- 5. For the benchmark, do you have any
suggestions? I
>>>>> submitted
>>>>>>> the
>>>>>>>> PR
>>>>>>>>>> [1].
>>>>>>>>>>
>>>>>>>>>> I haven't looked at it yet, but I'll try to do
it soon.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your feedback. I have a servel of
questions.
>>>>>>>>>>>
>>>>>>>>>>> 1. Do you mean split this into two JIRAs
or two PRs or two
>>>>>>> commits
>>>>>>>>>> in a
>>>>>>>>>>> PR?
>>>>>>>>>>> 2. For the first task, if the flink user
disables the
>>>>>> Unaligned
>>>>>>>>>>> Checkpoint, do we ignore max buffers per
channel? Because
>>>>> the
>>>>>>>>>> overdraft
>>>>>>>>>>> isn't useful for the Aligned Checkpoint, it
still needs to
>>>>>> wait
>>>>>>>> for
>>>>>>>>>>> downstream Task to consume.
>>>>>>>>>>> 3. For the second task
>>>>>>>>>>> - The default value of
maxOverdraftBuffersPerPartition
>>>>> may
>>>>>>> also
>>>>>>>>>> need
>>>>>>>>>>> to be discussed.
>>>>>>>>>>> - If the user disables the Unaligned
Checkpoint, can we
>>>>> set
>>>>>>> the
>>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the
>>>> overdraft
>>>>>>> isn't
>>>>>>>>>> useful for
>>>>>>>>>>> the Aligned Checkpoint.
>>>>>>>>>>> 4. For the legacy source
>>>>>>>>>>> - If enabling the Unaligned Checkpoint, it
uses up to
>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>>>>>>>>>> - If disabling the UC, it doesn't use the
overdraft
>>>>> buffer.
>>>>>>>>>>> - Do you think it's ok?
>>>>>>>>>>> - Actually, we added the checkAvailable logic for
>>>>>>> LegacySource
>>>>>>>>>> in our
>>>>>>>>>>> internal version. It works well.
>>>>>>>>>>> 5. For the benchmark, do you have any
suggestions? I
>>>>> submitted
>>>>>>> the
>>>>>>>>>> PR
>>>>>>>>>>> [1].
>>>>>>>>>>>
>>>>>>>>>>> [1]
https://github.com/apache/flink-benchmarks/pull/54
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> fanrui
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton
Kalashnikov <
>>>>>>> kaa....@yandex.com
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> We discuss about it a little with Dawid
Wysakowicz. Here is
>>>>> some
>>>>>>>>>>>> conclusion:
>>>>>>>>>>>>
>>>>>>>>>>>> First of all, let's split this into two tasks.
>>>>>>>>>>>>
>>>>>>>>>>>> The first task is about ignoring max buffers
per channel.
>>>> This
>>>>>>> means
>>>>>>>> if
>>>>>>>>>>>> we request a memory segment from
LocalBufferPool and the
>>>>>>>>>>>> maxBuffersPerChannel is reached for this
channel, we just
>>>>> ignore
>>>>>>> that
>>>>>>>>>>>> and continue to allocate buffer while
LocalBufferPool has
>>>> it(it
>>>>>> is
>>>>>>>>>>>> actually not a overdraft).
>>>>>>>>>>>>
>>>>>>>>>>>> The second task is about the real overdraft.
I am pretty
>>>>>> convinced
>>>>>>>> now
>>>>>>>>>>>> that we, unfortunately, need configuration
for limitation of
>>>>>>>> overdraft
>>>>>>>>>>>> number(because it is not ok if one subtask
allocates all
>>>>> buffers
>>>>>> of
>>>>>>>> one
>>>>>>>>>>>> TaskManager considering that several
different jobs can be
>>>>>>> submitted
>>>>>>>> on
>>>>>>>>>>>> this TaskManager). So idea is to have
>>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically
to say per
>>>>>>>>>> LocalBufferPool).
>>>>>>>>>>>> In this case, when a limit of buffers in
LocalBufferPool is
>>>>>>> reached,
>>>>>>>>>>>> LocalBufferPool can request additionally from
>>>> NetworkBufferPool
>>>>>> up
>>>>>>> to
>>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> But it is still not clear how to handle
LegacySource since it
>>>>>>>> actually
>>>>>>>>>>>> works as unlimited flatmap and it will always
work in
>>>> overdraft
>>>>>>> mode
>>>>>>>>>>>> which is not a target. So we still need to
think about that.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет:
>>>>>>>>>>>>> Hi Anton Kalashnikov,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think you agree with we should limit the
maximum number of
>>>>>>>> overdraft
>>>>>>>>>>>>> segments that each LocalBufferPool can apply
for, right?
>>>>>>>>>>>>>
>>>>>>>>>>>>> I prefer to hard code the
maxOverdraftBuffers due to don't
>>>> add
>>>>>> the
>>>>>>>> new
>>>>>>>>>>>>> configuration. And I hope to hear more from
the community.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best wishes
>>>>>>>>>>>>> fanrui
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
>>>>> 1996fan...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi Anton Kalashnikov,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your very clear reply, I think
you are totally
>>>>>> right.
>>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber'
can be used as
>>>>> the
>>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
>>>>>>> configuration.Flink
>>>>>>>>>> users
>>>>>>>>>>>>>> can turn up the maxBuffersNumber to control
the overdraft
>>>>>> buffer
>>>>>>>>>> size.
>>>>>>>>>>>>>> Also, I‘d like to add some information. For
safety, we
>>>> should
>>>>>>> limit
>>>>>>>>>> the
>>>>>>>>>>>>>> maximum number of overdraft segments that each
>>>>> LocalBufferPool
>>>>>>>>>>>>>> can apply for.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Why do we limit it?
>>>>>>>>>>>>>> Some operators don't check the
`recordWriter.isAvailable`
>>>>>> during
>>>>>>>>>>>>>> processing records, such as LegacySource. I
have mentioned
>>>> it
>>>>>> in
>>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are
other cases.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If don't add the limitation, the
LegacySource will use up
>>>> all
>>>>>>>>>> remaining
>>>>>>>>>>>>>> memory in the NetworkBufferPool when the
backpressure is
>>>>>> severe.
>>>>>>>>>>>>>> How to limit it?
>>>>>>>>>>>>>> I prefer to hard code the
>>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
>>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
>>>>> maxOverdraftBuffers
>>>>>> is
>>>>>>>>>> just
>>>>>>>>>>>>>> for safety, and it should be enough for
most flink jobs. Or
>>>>> we
>>>>>>> can
>>>>>>>>>> set
>>>>>>>>>>>>>>
`maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)`
>>>> to
>>>>>>> handle
>>>>>>>>>>>>>> some jobs of low parallelism.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also if user don't enable the Unaligned
Checkpoint, we can
>>>>> set
>>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
>>>> LocalBufferPool.
>>>>>>>> Because
>>>>>>>>>>>>>> the overdraft isn't useful for the Aligned
Checkpoint.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
https://issues.apache.org/jira/browse/FLINK-26759
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best wishes
>>>>>>>>>>>>>> fanrui
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton
Kalashnikov <
>>>>>>>>>> kaa....@yandex.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi fanrui,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for creating the FLIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In general, I think the overdraft is good
idea and it
>>>> should
>>>>>>> help
>>>>>>>> in
>>>>>>>>>>>>>>> described above cases. Here are my
thoughts about
>>>>>> configuration:
>>>>>>>>>>>>>>> Please, correct me if I am wrong but as I
understand right
>>>>> now
>>>>>>> we
>>>>>>>>>> have
>>>>>>>>>>>>>>> following calculation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
>>>>> memory(calculated
>>>>>>> via
>>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
>>>>>>>> taskmanager.memory.network.min,
>>>>>>>>>>>>>>> taskmanager.memory.network.max and total
memory size) /
>>>>>>>>>>>>>>> taskmanager.memory.segment-size.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) =
(exclusive
>>>> buffers
>>>>> *
>>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks
number in
>>>>>> TaskManager
>>>>>>>>>>>>>>> buffersInUseNumber = real number of
buffers which used at
>>>>>>> current
>>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be
equal to
>>>>>>> maxBuffersNumber
>>>>>>>>>> which
>>>>>>>>>>>>>>> allows Flink work predictibly. But if
>>>> requiredBuffersNumber
>>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also
fine(but not good)
>>>>> since
>>>>>>> not
>>>>>>>>>> all
>>>>>>>>>>>>>>> required buffers really mandatory(e.g. it
is ok if Flink
>>>> can
>>>>>> not
>>>>>>>>>>>>>>> allocate floating buffers)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> But if maxBuffersNumber >
requiredBuffersNumber, as I
>>>>>> understand
>>>>>>>>>> Flink
>>>>>>>>>>>>>>> just never use these leftovers
buffers(maxBuffersNumber -
>>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to
use. ( we can
>>>>>> actualy
>>>>>>>> use
>>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
>>>> buffersInUseNumber'
>>>>>>> since
>>>>>>>>>> if
>>>>>>>>>>>>>>> one TaskManager contains several operators
including
>>>>> 'window'
>>>>>>>> which
>>>>>>>>>> can
>>>>>>>>>>>>>>> temporally borrow buffers from the global
pool).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My proposal, more specificaly(it relates
only to
>>>> requesting
>>>>>>>> buffers
>>>>>>>>>>>>>>> during processing single record while
switching to
>>>>>> unavalability
>>>>>>>>>>>> between
>>>>>>>>>>>>>>> records should be the same as we have it now):
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * If one more buffer requested but
maxBuffersPerChannel
>>>>>> reached,
>>>>>>>>>> then
>>>>>>>>>>>>>>> just ignore this limitation and allocate
this buffers from
>>>>> any
>>>>>>>>>>>>>>> place(from LocalBufferPool if it has
something yet
>>>> otherwise
>>>>>>> from
>>>>>>>>>>>>>>> NetworkBufferPool)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then
temporally
>>>> allocate
>>>>>> it
>>>>>>>> from
>>>>>>>>>>>>>>> NetworkBufferPool while it has something
to allocate
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Maybe I missed something and this solution
won't work,
>>>> but I
>>>>>>> like
>>>>>>>> it
>>>>>>>>>>>>>>> since on the one hand, it work from the
scratch without
>>>> any
>>>>>>>>>>>>>>> configuration, on the other hand, it can
be configuration
>>>> by
>>>>>>>>>> changing
>>>>>>>>>>>>>>> proportion of maxBuffersNumber and
requiredBuffersNumber.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The last thing that I want to say, I don't
really want to
>>>>>>>> implement
>>>>>>>>>> new
>>>>>>>>>>>>>>> configuration since even now it is not
clear how to
>>>>> correctly
>>>>>>>>>> configure
>>>>>>>>>>>>>>> network buffers with existing
configuration and I don't
>>>> want
>>>>>> to
>>>>>>>>>>>>>>> complicate it, especially if it will be
possible to
>>>> resolve
>>>>>> the
>>>>>>>>>> problem
>>>>>>>>>>>>>>> automatically(as described above).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So is my understanding about network
memory/buffers
>>>> correct?
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>>>> Anton Kalashnikov
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
>>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a
major feature of
>>>>>> Flink.
>>>>>>>> It
>>>>>>>>>>>>>>>> effectively solves the problem of
checkpoint timeout or
>>>>> slow
>>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint)
does not work well
>>>>>> when
>>>>>>>> the
>>>>>>>>>>>>>>>> back pressure is severe and multiple
output buffers are
>>>>>>> required
>>>>>>>> to
>>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2]
also mentioned
>>>>> this
>>>>>>>> issue
>>>>>>>>>>>>>>>> before. So we propose the overdraft
buffer to solve it.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4]
to detail the
>>>>>>> overdraft
>>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
>>>> Kalashnikov,
>>>>>>> there
>>>>>>>>>> are
>>>>>>>>>>>>>>>> still some points to discuss:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> * There are already a lot of
buffer-related
>>>>>> configurations.
>>>>>>>> Do
>>>>>>>>>> we
>>>>>>>>>>>>>>>> need to add a new configuration for the
overdraft
>>>>>> buffer?
>>>>>>>>>>>>>>>> * Where should the overdraft buffer
use memory?
>>>>>>>>>>>>>>>> * If the overdraft-buffer uses the
memory remaining
>>>> in
>>>>>> the
>>>>>>>>>>>>>>>> NetworkBufferPool, no new configuration
needs to be
>>>>>>> added.
>>>>>>>>>>>>>>>> * If adding a new configuration:
>>>>>>>>>>>>>>>> o Should we set the
overdraft-memory-size at the
>>>> TM
>>>>>>> level
>>>>>>>>>> or
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> Task level?
>>>>>>>>>>>>>>>> o Or set overdraft-buffers to indicate
the number
>>>>> of
>>>>>>>>>>>>>>>> memory-segments that can be overdrawn.
>>>>>>>>>>>>>>>> o What is the default value? How to
set sensible
>>>>>>>> defaults?
>>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and
verified it using
>>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets
overdraft-buffers at
>>>>> Task
>>>>>>>> level,
>>>>>>>>>>>>>>>> and default value is 10. That is: each
LocalBufferPool
>>>> can
>>>>>>>>>> overdraw up
>>>>>>>>>>>>>>>> to 10 memory-segments.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Looking forward to your feedback!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> fanrui
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>>>>>>>>>>>>>>>> [2]
https://issues.apache.org/jira/browse/FLINK-14396
>>>>>>>>>>>>>>>> [3]
https://issues.apache.org/jira/browse/FLINK-26762
>>>>>>>>>>>>>>>> [4]
>>>>>>>>>>>>>>>>
>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
>>>>>>>>>>>>>>>> [5]
>>>>>>>>>>>>>>>>
>>
https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
>>>>>>>>>>>>>>>> [6]
https://github.com/apache/flink-benchmarks/pull/54
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>> Anton Kalashnikov
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>> Anton Kalashnikov
>>>>>>>>>>
>>>>>>>>>>
>> --
>>
>> Best regards,
>> Anton Kalashnikov
>>
>>