Hi,

Thanks Fanrui, It looks correct for me.

I vote for 5 as 'max-overdraft-buffers-per-gate'.


If I understand correctly, Legacy source can be detected by the operator which is an instance of StreamSource and it is also can be detected by invokable which is an instance of SourceStreamTask. We create ResultPartitionWriters in the constructor of Task and theoretically, we already know the invokable type. So we can create ResultPartitionWriters with BufferPoolFactory which will produce the correct LocalBufferPool. But honestly, it looks a little dirty and I don't actually know what type of invokable we have in case of the chain. But roughly, the idea is to create LocalBufferPool with/without overdraft based on knowledge of operator type.

--

Best regards,
Anton Kalashnikov

05.05.2022 11:49, rui fan пишет:
Hi,

Thanks a lot for your discussion.

After several discussions, I think it's clear now. I updated the
"Proposed Changes" of FLIP-227[1]. If I have something
missing, please help to add it to FLIP, or add it in the mail
and I can add it to FLIP. If everything is OK, I will create a
new JIRA for the first task, and use FLINK-26762[2] as the
second task.

About the legacy source, do we set maxOverdraftBuffersPerGate=0
directly? How to identify legacySource? Or could we add
the overdraftEnabled in LocalBufferPool? The default value
is false. If the getAvailableFuture is called, change overdraftEnabled=true.
It indicates whether there are checks isAvailable elsewhere.
It might be more general, it can cover more cases.

Also, I think the default value of 'max-overdraft-buffers-per-gate'
needs to be confirmed. I prefer it to be between 5 and 10. How
do you think?

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
[2] https://issues.apache.org/jira/browse/FLINK-26762

Thanks
fanrui

On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

    Hi again,

    After sleeping over this, if both versions (reserve and overdraft)
    have the same complexity, I would also prefer the overdraft.

    > `Integer.MAX_VALUE` as default value was my idea as well but now, as
    > Dawid mentioned, I think it is dangerous since it is too
    implicit for
    > the user and if the user submits one more job for the same
    TaskManger

    As I mentioned, it's not only an issue with multiple jobs. The
    same problem can happen with different subtasks from the same job,
    potentially leading to the FLINK-13203 deadlock [1]. With
    FLINK-13203 fixed, I would be in favour of Integer.MAX_VALUE to be
    the default value, but as it is, I think we should indeed play on
    the safe side and limit it.

    > I still don't understand how should be limited "reserve"
    implementation.
    > I mean if we have X buffers in total and the user sets overdraft
    equal
    > to X we obviously can not reserve all buffers, but how many we are
    > allowed to reserve? Should it be a different configuration like
    > percentegeForReservedBuffers?

    The reserve could be defined as percentage, or as a fixed number
    of buffers. But yes. In normal operation subtask would not use the
    reserve, as if numberOfAvailableBuffers < reserve, the output
    would be not available. Only in the flatMap/timers/huge records
    case the reserve could be used.

    > 1. If the total buffers of LocalBufferPool <= the reserve
    buffers, will LocalBufferPool never be available? Can't process data?

    Of course we would need to make sure that never happens. So the
    reserve should be < total buffer size.

    > 2. If the overdraft buffer use the extra buffers, when the
    downstream
    > task inputBuffer is insufficient, it should fail to start the
    job, and then
    > restart? When the InputBuffer is initialized, it will apply for
    enough
    > buffers, right?

    The failover if downstream can not allocate buffers is already
    implemented FLINK-14872 [2]. There is a timeout for how long the
    task is waiting for buffer allocation. However this doesn't
    prevent many (potentially infinitely many) deadlock/restarts
    cycles. IMO the propper solution for [1] would be 2b described in
    the ticket:

    > 2b. Assign extra buffers only once all of the tasks are RUNNING.
    This is a simplified version of 2a, without tracking the tasks
    sink-to-source.

    But that's a pre-existing problem and I don't think we have to
    solve it before implementing overdraft. I think we would need to
    solve it only before setting Integer.MAX_VALUE as the default for
    the overdraft. Maybe I would hesitate setting the overdraft to
    anything more then a couple of buffers by default for the same reason.

    > Actually, I totally agree that we don't need a lot of buffers
    for overdraft

    and

    > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
    > When we finish this feature and after users use it, if users
    feedback
    > this issue we can discuss again.

    +1

    Piotrek

    [1] https://issues.apache.org/jira/browse/FLINK-13203
    [2] https://issues.apache.org/jira/browse/FLINK-14872

    czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a):

        Hi everyone,

        I still have some questions.

        1. If the total buffers of LocalBufferPool <= the reserve
        buffers, will
        LocalBufferPool never be available? Can't process data?
        2. If the overdraft buffer use the extra buffers, when the
        downstream
        task inputBuffer is insufficient, it should fail to start the
        job, and then
        restart? When the InputBuffer is initialized, it will apply
        for enough
        buffers, right?

        Also I agree we ignore the overdraftBuffers=numberOfSubpartitions.
        When we finish this feature and after users use it, if users
        feedback
        this issue we can discuss again.

        Thanks
        fanrui

        On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz
        <dwysakow...@apache.org> wrote:

            Hey all,

            I have not replied in the thread yet, but I was following
            the discussion.

            Personally, I like Fanrui's and Anton's idea. As far as I
            understand it
            the idea to distinguish between inside flatMap & outside
            would be fairly
            simple, but maybe slightly indirect. The checkAvailability
            would remain
            unchanged and it is checked always between separate
            invocations of the
            UDF. Therefore the overdraft buffers would not apply
            there. However once
            the pool says it is available, it means it has at least an
            initial
            buffer. So any additional request without checking for
            availability can
            be considered to be inside of processing a single record.
            This does not
            hold just for the LegacySource as I don't think it
            actually checks for
            the availability of buffers in the LocalBufferPool.

            In the offline chat with Anton, we also discussed if we
            need a limit of
            the number of buffers we could overdraft (or in other
            words if the limit
            should be equal to Integer.MAX_VALUE), but personally I'd
            prefer to stay
            on the safe side and have it limited. The pool of network
            buffers is
            shared for the entire TaskManager, so it means it can be
            shared even
            across tasks of separate jobs. However, I might be just
            unnecessarily
            cautious here.

            Best,

            Dawid

            On 04/05/2022 10:54, Piotr Nowojski wrote:
            > Hi,
            >
            > Thanks for the answers.
            >
            >> we may still need to discuss whether the
            >> overdraft/reserve/spare should use extra buffers or buffers
            >> in (exclusive + floating buffers)?
            > and
            >
            >> These things resolve the different problems (at least
            as I see that).
            >> The current hardcoded "1"  says that we switch
            "availability" to
            >> "unavailability" when one more buffer is left(actually
            a little less
            >> than one buffer since we write the last piece of data
            to this last
            >> buffer). The overdraft feature doesn't change this
            logic we still want
            >> to switch to "unavailability" in such a way but if we
            are already in
            >> "unavailability" and we want more buffers then we can
            take "overdraft
            >> number" more. So we can not avoid this hardcoded "1"
            since we need to
            >> understand when we should switch to "unavailability"
            > Ok, I see. So it seems to me that both of you have in
            mind to keep the
            > buffer pools as they are right now, but if we are in the
            middle of
            > processing a record, we can request extra overdraft
            buffers on top of
            > those? This is another way to implement the overdraft to
            what I was
            > thinking. I was thinking about something like keeping
            the "overdraft" or
            > more precisely buffer "reserve" in the buffer pool. I
            think my version
            > would be easier to implement, because it is just
            fiddling with min/max
            > buffers calculation and slightly modified
            `checkAvailability()` logic.
            >
            > On the other hand  what you have in mind would better
            utilise the available
            > memory, right? It would require more code changes (how
            would we know when
            > we are allowed to request the overdraft?). However, in
            this case, I would
            > be tempted to set the number of overdraft buffers by
            default to
            > `Integer.MAX_VALUE`, and let the system request as many
            buffers as
            > necessary. The only downside that I can think of (apart
            of higher
            > complexity) would be higher chance of hitting a
            known/unsolved deadlock [1]
            > in a scenario:
            > - downstream task hasn't yet started
            > - upstream task requests overdraft and uses all
            available memory segments
            > from the global pool
            > - upstream task is blocked, because downstream task
            hasn't started yet and
            > can not consume any data
            > - downstream task tries to start, but can not, as there
            are no available
            > buffers
            >
            >> BTW, for watermark, the number of buffers it needs is
            >> numberOfSubpartitions. So if
            overdraftBuffers=numberOfSubpartitions,
            >> the watermark won't block in requestMemory.
            > and
            >
            >> the best overdraft size will be equal to parallelism.
            > That's a lot of buffers. I don't think we need that many
            for broadcasting
            > watermarks. Watermarks are small, and remember that
            every subpartition has
            > some partially filled/empty WIP buffer, so the vast
            majority of
            > subpartitions will not need to request a new buffer.
            >
            > Best,
            > Piotrek
            >
            > [1] https://issues.apache.org/jira/browse/FLINK-13203
            >
            > wt., 3 maj 2022 o 17:15 Anton Kalashnikov
            <kaa....@yandex.com> napisał(a):
            >
            >> Hi,
            >>
            >>
            >>   >> Do you mean to ignore it while processing records,
            but keep using
            >> `maxBuffersPerChannel` when calculating the
            availability of the output?
            >>
            >>
            >> Yes, it is correct.
            >>
            >>
            >>   >> Would it be a big issue if we changed it to check
            if at least
            >> "overdraft number of buffers are available", where
            "overdraft number" is
            >> configurable, instead of the currently hardcoded value
            of "1"?
            >>
            >>
            >> These things resolve the different problems (at least
            as I see that).
            >> The current hardcoded "1"  says that we switch
            "availability" to
            >> "unavailability" when one more buffer is left(actually
            a little less
            >> than one buffer since we write the last piece of data
            to this last
            >> buffer). The overdraft feature doesn't change this
            logic we still want
            >> to switch to "unavailability" in such a way but if we
            are already in
            >> "unavailability" and we want more buffers then we can
            take "overdraft
            >> number" more. So we can not avoid this hardcoded "1"
            since we need to
            >> understand when we should switch to "unavailability"
            >>
            >>
            >> -- About "reserve" vs "overdraft"
            >>
            >> As Fanrui mentioned above, perhaps, the best overdraft
            size will be
            >> equal to parallelism. Also, the user can set any value
            he wants. So even
            >> if parallelism is small(~5) but the user's flatmap
            produces a lot of
            >> data, the user can set 10 or even more. Which almost
            double the max
            >> buffers and it will be impossible to reserve. At least
            we need to figure
            >> out how to protect from such cases (the limit for an
            overdraft?). So
            >> actually it looks even more difficult than increasing
            the maximum buffers.
            >>
            >> I want to emphasize that overdraft buffers are soft
            configuration which
            >> means it takes as many buffers as the global buffers
            pool has
            >> available(maybe zero) but less than this configured
            value. It is also
            >> important to notice that perhaps, not many subtasks in
            TaskManager will
            >> be using this feature so we don't actually need a lot
            of available
            >> buffers for every subtask(Here, I mean that if we have
            only one
            >> window/flatmap operator and many other operators, then
            one TaskManager
            >> will have many ordinary subtasks which don't actually
            need overdraft and
            >> several subtasks that needs this feature). But in case
            of reservation,
            >> we will reserve some buffers for all operators even if
            they don't really
            >> need it.
            >>
            >>
            >> -- Legacy source problem
            >>
            >> If we still want to change max buffers then it is
            problem for
            >> LegacySources(since every subtask of source will always
            use these
            >> overdraft). But right now, I think that we can force to
            set 0 overdraft
            >> buffers for legacy subtasks in configuration during
            execution(if it is
            >> not too late for changing configuration in this place).
            >>
            >>
            >> 03.05.2022 14:11, rui fan пишет:
            >>> Hi
            >>>
            >>> Thanks for Martijn Visser and Piotrek's feedback.  I
            agree with
            >>> ignoring the legacy source, it will affect our design.
            User should
            >>> use the new Source Api as much as possible.
            >>>
            >>> Hi Piotrek, we may still need to discuss whether the
            >>> overdraft/reserve/spare should use extra buffers or
            buffers
            >>> in (exclusive + floating buffers)? They have some
            differences.
            >>>
            >>> If it uses extra buffers:
            >>> 1.The LocalBufferPool will be available when
            (usedBuffers + 1
            >>>    <= currentPoolSize) and all subpartitions don't
            reach the
            >>> maxBuffersPerChannel.
            >>>
            >>> If it uses the buffers in (exclusive + floating buffers):
            >>> 1. The LocalBufferPool will be available when
            (usedBuffers +
            >>> overdraftBuffers <= currentPoolSize) and all subpartitions
            >>> don't reach the maxBuffersPerChannel.
            >>> 2. For low parallelism jobs, if overdraftBuffers is
            large(>8), the
            >>> usedBuffers will be small. That is the LocalBufferPool
            will be
            >>> easily unavailable. For throughput, if users turn up the
            >>> overdraft buffers, they need to turn up exclusive or
            floating
            >>> buffers. It also affects the InputChannel, and it's is
            unfriendly
            >>> to users.
            >>>
            >>> So I prefer the overdraft to use extra buffers.
            >>>
            >>>
            >>> BTW, for watermark, the number of buffers it needs is
            >>> numberOfSubpartitions. So if
            overdraftBuffers=numberOfSubpartitions,
            >>> the watermark won't block in requestMemory. But it has
            >>> 2 problems:
            >>> 1. It needs more overdraft buffers. If the overdraft uses
            >>> (exclusive + floating buffers),  there will be fewer
            buffers
            >>> available. Throughput may be affected.
            >>> 2. The numberOfSubpartitions is different for each Task.
            >>> So if users want to cover watermark using this feature,
            >>> they don't know how to set the overdraftBuffers more r
            >>> easonably. And if the parallelism is changed, users still
            >>> need to change overdraftBuffers. It is unfriendly to
            users.
            >>>
            >>> So I propose we support overdraftBuffers=-1, It means
            >>> we will automatically set
            overdraftBuffers=numberOfSubpartitions
            >>> in the Constructor of LocalBufferPool.
            >>>
            >>> Please correct me if I'm wrong.
            >>>
            >>> Thanks
            >>> fanrui
            >>>
            >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski
            <pnowoj...@apache.org>
            >> wrote:
            >>>> Hi fanrui,
            >>>>
            >>>>> Do you mean don't add the extra buffers? We just use
            (exclusive
            >> buffers *
            >>>>> parallelism + floating buffers)? The LocalBufferPool
            will be available
            >>>> when
            >>>>> (usedBuffers+overdraftBuffers <=
            >>>> exclusiveBuffers*parallelism+floatingBuffers)
            >>>>> and all subpartitions don't reach the
            maxBuffersPerChannel, right?
            >>>> I'm not sure. Definitely we would need to adjust the
            minimum number of
            >> the
            >>>> required buffers, just as we did when we were
            implementing the non
            >> blocking
            >>>> outputs and adding availability logic to
            LocalBufferPool. Back then we
            >>>> added "+ 1" to the minimum number of buffers.
            Currently this logic is
            >>>> located
            NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition:
            >>>>
            >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
            numSubpartitions + 1;
            >>>> For performance reasons, we always require at least
            one buffer per
            >>>> sub-partition. Otherwise performance falls
            drastically. Now if we
            >> require 5
            >>>> overdraft buffers for output to be available, we need
            to have them on
            >> top
            >>>> of those "one buffer per sub-partition". So the logic
            should be changed
            >> to:
            >>>>> int min = isSortShuffle ? sortShuffleMinBuffers :
            numSubpartitions +
            >>>> numOverdraftBuffers;
            >>>>
            >>>> Regarding increasing the number of max buffers I'm
            not sure. As long as
            >>>> "overdraft << max number of buffers", because all
            buffers on the outputs
            >>>> are shared across all sub-partitions. If we have 5
            overdraft buffers,
            >> and
            >>>> parallelism of 100, it doesn't matter in the grand
            scheme of things if
            >> we
            >>>> make the output available if at least one single
            buffer is available or
            >> at
            >>>> least 5 buffers are available out of ~200 (100 * 2 +
            8). So effects of
            >>>> increasing the overdraft from 1 to for example 5
            should be negligible.
            >> For
            >>>> small parallelism, like 5, increasing overdraft from
            1 to 5 still
            >> increases
            >>>> the overdraft by only about 25%. So maybe we can keep
            the max as it is?
            >>>>
            >>>> If so, maybe we should change the name from
            "overdraft" to "buffer
            >> reserve"
            >>>> or "spare buffers"? And document it as "number of
            buffers kept in
            >> reserve
            >>>> in case of flatMap/firing timers/huge records"?
            >>>>
            >>>> What do you think Fenrui, Anton?
            >>>>
            >>>> Re LegacySources. I agree we can kind of ignore them
            in the new
            >> features,
            >>>> as long as we don't brake the existing deployments
            too much.
            >>>>
            >>>> Best,
            >>>> Piotrek
            >>>>
            >>>> wt., 3 maj 2022 o 09:20 Martijn Visser
            <mart...@ververica.com>
            >> napisał(a):
            >>>>> Hi everyone,
            >>>>>
            >>>>> Just wanted to chip in on the discussion of legacy
            sources: IMHO, we
            >>>> should
            >>>>> not focus too much on improving/adding capabilities
            for legacy sources.
            >>>> We
            >>>>> want to persuade and push users to use the new
            Source API. Yes, this
            >>>> means
            >>>>> that there's work required by the end users to port
            any custom source
            >> to
            >>>>> the new interface. The benefits of the new Source
            API should outweigh
            >>>> this.
            >>>>> Anything that we build to support multiple
            interfaces means adding more
            >>>>> complexity and more possibilities for bugs. Let's
            try to make our
            >> lives a
            >>>>> little bit easier.
            >>>>>
            >>>>> Best regards,
            >>>>>
            >>>>> Martijn Visser
            >>>>> https://twitter.com/MartijnVisser82
            >>>>> https://github.com/MartijnVisser
            >>>>>
            >>>>>
            >>>>> On Tue, 3 May 2022 at 07:50, rui fan
            <1996fan...@gmail.com> wrote:
            >>>>>
            >>>>>> Hi Piotrek
            >>>>>>
            >>>>>>> Do you mean to ignore it while processing records,
            but keep using
            >>>>>>> `maxBuffersPerChannel` when calculating the
            availability of the
            >>>> output?
            >>>>>> I think yes, and please Anton Kalashnikov to help
            double check.
            >>>>>>
            >>>>>>> +1 for just having this as a separate
            configuration. Is it a big
            >>>>> problem
            >>>>>>> that legacy sources would be ignoring it? Note
            that we already have
            >>>>>>> effectively hardcoded a single overdraft buffer.
            >>>>>>> `LocalBufferPool#checkAvailability` checks if
            there is a single
            >>>> buffer
            >>>>>>> available and this works the same for all tasks
            (including legacy
            >>>>> source
            >>>>>>> tasks). Would it be a big issue if we changed it
            to check if at least
            >>>>>>> "overdraft number of buffers are available", where
            "overdraft number"
            >>>>> is
            >>>>>>> configurable, instead of the currently hardcoded
            value of "1"?
            >>>>>> Do you mean don't add the extra buffers? We just
            use (exclusive
            >>>> buffers *
            >>>>>> parallelism + floating buffers)? The
            LocalBufferPool will be available
            >>>>> when
            >>>>>> (usedBuffers+overdraftBuffers <=
            >>>>>> exclusiveBuffers*parallelism+floatingBuffers)
            >>>>>> and all subpartitions don't reach the
            maxBuffersPerChannel, right?
            >>>>>>
            >>>>>> If yes, I think it can solve the problem of legacy
            source. There may
            >> be
            >>>>>> some impact. If overdraftBuffers is large and only
            one buffer is used
            >>>> to
            >>>>>> process a single record, exclusive
            buffers*parallelism + floating
            >>>> buffers
            >>>>>> cannot be used. It may only be possible to use
            (exclusive buffers *
            >>>>>> parallelism
            >>>>>> + floating buffers - overdraft buffers + 1). For
            throughput, if turn
            >> up
            >>>>> the
            >>>>>> overdraft buffers, the flink user needs to turn up
            exclusive or
            >>>> floating
            >>>>>> buffers. And it also affects the InputChannel.
            >>>>>>
            >>>>>> If not, I don't think it can solve the problem of
            legacy source. The
            >>>>> legacy
            >>>>>> source don't check isAvailable, If there are the
            extra buffers, legacy
            >>>>>> source
            >>>>>> will use them up until block in requestMemory.
            >>>>>>
            >>>>>>
            >>>>>> Thanks
            >>>>>> fanrui
            >>>>>>
            >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski
            <pnowoj...@apache.org>
            >>>>>> wrote:
            >>>>>>
            >>>>>>> Hi,
            >>>>>>>
            >>>>>>> +1 for the general proposal from my side. It would
            be a nice
            >>>> workaround
            >>>>>>> flatMaps, WindowOperators and large records issues
            with unaligned
            >>>>>>> checkpoints.
            >>>>>>>
            >>>>>>>> The first task is about ignoring max buffers per
            channel. This
            >>>> means
            >>>>> if
            >>>>>>>> we request a memory segment from LocalBufferPool
            and the
            >>>>>>>> maxBuffersPerChannel is reached for this channel,
            we just ignore
            >>>> that
            >>>>>>>> and continue to allocate buffer while
            LocalBufferPool has it(it is
            >>>>>>>> actually not a overdraft).
            >>>>>>> Do you mean to ignore it while processing records,
            but keep using
            >>>>>>> `maxBuffersPerChannel` when calculating the
            availability of the
            >>>> output?
            >>>>>>>> The second task is about the real overdraft. I am
            pretty convinced
            >>>>> now
            >>>>>>>> that we, unfortunately, need configuration for
            limitation of
            >>>>> overdraft
            >>>>>>>> number(because it is not ok if one subtask
            allocates all buffers of
            >>>>> one
            >>>>>>>> TaskManager considering that several different
            jobs can be
            >>>> submitted
            >>>>> on
            >>>>>>>> this TaskManager). So idea is to have
            >>>>>>>> maxOverdraftBuffersPerPartition(technically to
            say per
            >>>>>> LocalBufferPool).
            >>>>>>>> In this case, when a limit of buffers in
            LocalBufferPool is
            >>>> reached,
            >>>>>>>> LocalBufferPool can request additionally from
            NetworkBufferPool up
            >>>> to
            >>>>>>>> maxOverdraftBuffersPerPartition buffers.
            >>>>>>> +1 for just having this as a separate
            configuration. Is it a big
            >>>>> problem
            >>>>>>> that legacy sources would be ignoring it? Note
            that we already have
            >>>>>>> effectively hardcoded a single overdraft buffer.
            >>>>>>> `LocalBufferPool#checkAvailability` checks if
            there is a single
            >>>> buffer
            >>>>>>> available and this works the same for all tasks
            (including legacy
            >>>>> source
            >>>>>>> tasks). Would it be a big issue if we changed it
            to check if at least
            >>>>>>> "overdraft number of buffers are available", where
            "overdraft number"
            >>>>> is
            >>>>>>> configurable, instead of the currently hardcoded
            value of "1"?
            >>>>>>>
            >>>>>>> Best,
            >>>>>>> Piotrek
            >>>>>>>
            >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan
            <1996fan...@gmail.com> napisał(a):
            >>>>>>>
            >>>>>>>> Let me add some information about the LegacySource.
            >>>>>>>>
            >>>>>>>> If we want to disable the overdraft buffer for
            LegacySource.
            >>>>>>>> Could we add the enableOverdraft in LocalBufferPool?
            >>>>>>>> The default value is false. If the
            getAvailableFuture is called,
            >>>>>>>> change enableOverdraft=true. It indicates whether
            there are
            >>>>>>>> checks isAvailable elsewhere.
            >>>>>>>>
            >>>>>>>> I don't think it is elegant, but it's safe.
            Please correct me if
            >>>> I'm
            >>>>>>> wrong.
            >>>>>>>> Thanks
            >>>>>>>> fanrui
            >>>>>>>>
            >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan
            <1996fan...@gmail.com>
            >>>>> wrote:
            >>>>>>>>> Hi,
            >>>>>>>>>
            >>>>>>>>> Thanks for your quick response.
            >>>>>>>>>
            >>>>>>>>> For question 1/2/3, we think they are clear. We
            just need to
            >>>>> discuss
            >>>>>>> the
            >>>>>>>>> default value in PR.
            >>>>>>>>>
            >>>>>>>>> For the legacy source, you are right. It's
            difficult for general
            >>>>>>>>> implementation.
            >>>>>>>>> Currently, we implement
            ensureRecordWriterIsAvailable() in
            >>>>>>>>> SourceFunction.SourceContext. And call it in our
            common
            >>>>> LegacySource,
            >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink
            jobs consume
            >>>> kafka,
            >>>>> so
            >>>>>>>>> fixing FlinkKafkaConsumer solved most of our
            problems.
            >>>>>>>>>
            >>>>>>>>> Core code:
            >>>>>>>>> ```
            >>>>>>>>> public void ensureRecordWriterIsAvailable() {
            >>>>>>>>>        if (recordWriter == null
            >>>>>>>>>             ||
            >>>>>>>>>
            >>
            
!configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
            >>>>>>>>> false)
            >>>>>>>>>             || recordWriter.isAvailable()) {
            >>>>>>>>>  return;
            >>>>>>>>>        }
            >>>>>>>>>
            >>>>>>>>> CompletableFuture<?> resumeFuture =
            >>>>>>>> recordWriter.getAvailableFuture();
            >>>>>>>>>        try {
            >>>>>>>>>  resumeFuture.get();
            >>>>>>>>>        } catch (Throwable ignored) {
            >>>>>>>>>        }
            >>>>>>>>> }
            >>>>>>>>> ```
            >>>>>>>>>
            >>>>>>>>> LegacySource calls
            sourceContext.ensureRecordWriterIsAvailable()
            >>>>>>>>> before synchronized (checkpointLock) and
            collects records.
            >>>>>>>>> Please let me know if there is a better solution.
            >>>>>>>>>
            >>>>>>>>> Thanks
            >>>>>>>>> fanrui
            >>>>>>>>>
            >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
            >>>>>> kaa....@yandex.com>
            >>>>>>>>> wrote:
            >>>>>>>>>
            >>>>>>>>>> Hi.
            >>>>>>>>>>
            >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or
            two PRs or two
            >>>>>> commits
            >>>>>>>> in a
            >>>>>>>>>>       PR?
            >>>>>>>>>>
            >>>>>>>>>> Perhaps, the separated ticket will be better
            since this task has
            >>>>>> fewer
            >>>>>>>>>> questions but we should find a solution for
            LegacySource first.
            >>>>>>>>>>
            >>>>>>>>>> --  2. For the first task, if the flink user
            disables the
            >>>>> Unaligned
            >>>>>>>>>>  Checkpoint, do we ignore max buffers per
            channel? Because
            >>>> the
            >>>>>>>>>> overdraft
            >>>>>>>>>>       isn't useful for the Aligned Checkpoint,
            it still needs to
            >>>>> wait
            >>>>>>> for
            >>>>>>>>>>  downstream Task to consume.
            >>>>>>>>>>
            >>>>>>>>>> I think that the logic should be the same for
            AC and UC. As I
            >>>>>>>> understand,
            >>>>>>>>>> the overdraft maybe is not really helpful for
            AC but it doesn't
            >>>>> make
            >>>>>>> it
            >>>>>>>>>> worse as well.
            >>>>>>>>>>
            >>>>>>>>>>     3. For the second task
            >>>>>>>>>> --      - The default value of
            maxOverdraftBuffersPerPartition
            >>>> may
            >>>>>>> also
            >>>>>>>>>> need
            >>>>>>>>>>          to be discussed.
            >>>>>>>>>>
            >>>>>>>>>> I think it should be a pretty small value or
            even 0 since it
            >>>> kind
            >>>>> of
            >>>>>>>>>> optimization and user should understand what
            they do(especially
            >>>> if
            >>>>>> we
            >>>>>>>>>> implement the first task).
            >>>>>>>>>>
            >>>>>>>>>> --      - If the user disables the Unaligned
            Checkpoint, can we
            >>>>> set
            >>>>>>> the
            >>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the
            overdraft
            >>>>>> isn't
            >>>>>>>>>> useful for
            >>>>>>>>>>          the Aligned Checkpoint.
            >>>>>>>>>>
            >>>>>>>>>> The same answer that above, if the overdraft
            doesn't make
            >>>>>> degradation
            >>>>>>>> for
            >>>>>>>>>> the Aligned Checkpoint I don't think that we
            should make
            >>>>> difference
            >>>>>>>> between
            >>>>>>>>>> AC and UC.
            >>>>>>>>>>
            >>>>>>>>>>       4. For the legacy source
            >>>>>>>>>> --      - If enabling the Unaligned Checkpoint,
            it uses up to
            >>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
            >>>>>>>>>>          - If disabling the UC, it doesn't use
            the overdraft
            >>>> buffer.
            >>>>>>>>>>          - Do you think it's ok?
            >>>>>>>>>>
            >>>>>>>>>> Ideally, I don't want to use overdraft for
            LegacySource at all
            >>>>> since
            >>>>>>> it
            >>>>>>>>>> can lead to undesirable results especially if
            the limit is high.
            >>>>> At
            >>>>>>>> least,
            >>>>>>>>>> as I understand, it will always work in
            overdraft mode and it
            >>>> will
            >>>>>>>> borrow
            >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from
            the global pool
            >>>> which
            >>>>>> can
            >>>>>>>> lead
            >>>>>>>>>> to degradation of other subtasks on the same
            TaskManager.
            >>>>>>>>>>
            >>>>>>>>>> --      - Actually, we added the checkAvailable
            logic for
            >>>>>> LegacySource
            >>>>>>>> in
            >>>>>>>>>> our
            >>>>>>>>>> internal version. It works well.
            >>>>>>>>>>
            >>>>>>>>>> I don't really understand how it is possible
            for general case
            >>>>>>>> considering
            >>>>>>>>>> that each user has their own implementation of
            >>>>> LegacySourceOperator
            >>>>>>>>>> --   5. For the benchmark, do you have any
            suggestions? I
            >>>>> submitted
            >>>>>>> the
            >>>>>>>> PR
            >>>>>>>>>>       [1].
            >>>>>>>>>>
            >>>>>>>>>> I haven't looked at it yet, but I'll try to do
            it soon.
            >>>>>>>>>>
            >>>>>>>>>>
            >>>>>>>>>> 29.04.2022 14:14, rui fan пишет:
            >>>>>>>>>>> Hi,
            >>>>>>>>>>>
            >>>>>>>>>>> Thanks for your feedback. I have a servel of
            questions.
            >>>>>>>>>>>
            >>>>>>>>>>>       1. Do you mean split this into two JIRAs
            or two PRs or two
            >>>>>>> commits
            >>>>>>>>>> in a
            >>>>>>>>>>>       PR?
            >>>>>>>>>>>       2. For the first task, if the flink user
            disables the
            >>>>>> Unaligned
            >>>>>>>>>>>  Checkpoint, do we ignore max buffers per
            channel? Because
            >>>>> the
            >>>>>>>>>> overdraft
            >>>>>>>>>>>  isn't useful for the Aligned Checkpoint, it
            still needs to
            >>>>>> wait
            >>>>>>>> for
            >>>>>>>>>>>  downstream Task to consume.
            >>>>>>>>>>>       3. For the second task
            >>>>>>>>>>> - The default value of
            maxOverdraftBuffersPerPartition
            >>>>> may
            >>>>>>> also
            >>>>>>>>>> need
            >>>>>>>>>>> to be discussed.
            >>>>>>>>>>> - If the user disables the Unaligned
            Checkpoint, can we
            >>>>> set
            >>>>>>> the
            >>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the
            >>>> overdraft
            >>>>>>> isn't
            >>>>>>>>>> useful for
            >>>>>>>>>>> the Aligned Checkpoint.
            >>>>>>>>>>>       4. For the legacy source
            >>>>>>>>>>> - If enabling the Unaligned Checkpoint, it
            uses up to
            >>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
            >>>>>>>>>>> - If disabling the UC, it doesn't use the
            overdraft
            >>>>> buffer.
            >>>>>>>>>>> - Do you think it's ok?
            >>>>>>>>>>> - Actually, we added the checkAvailable logic for
            >>>>>>> LegacySource
            >>>>>>>>>> in our
            >>>>>>>>>>> internal version. It works well.
            >>>>>>>>>>>       5. For the benchmark, do you have any
            suggestions? I
            >>>>> submitted
            >>>>>>> the
            >>>>>>>>>> PR
            >>>>>>>>>>>  [1].
            >>>>>>>>>>>
            >>>>>>>>>>> [1]
            https://github.com/apache/flink-benchmarks/pull/54
            >>>>>>>>>>>
            >>>>>>>>>>> Thanks
            >>>>>>>>>>> fanrui
            >>>>>>>>>>>
            >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton
            Kalashnikov <
            >>>>>>> kaa....@yandex.com
            >>>>>>>>>>> wrote:
            >>>>>>>>>>>
            >>>>>>>>>>>> Hi,
            >>>>>>>>>>>>
            >>>>>>>>>>>> We discuss about it a little with Dawid
            Wysakowicz. Here is
            >>>>> some
            >>>>>>>>>>>> conclusion:
            >>>>>>>>>>>>
            >>>>>>>>>>>> First of all, let's split this into two tasks.
            >>>>>>>>>>>>
            >>>>>>>>>>>> The first task is about ignoring max buffers
            per channel.
            >>>> This
            >>>>>>> means
            >>>>>>>> if
            >>>>>>>>>>>> we request a memory segment from
            LocalBufferPool and the
            >>>>>>>>>>>> maxBuffersPerChannel is reached for this
            channel, we just
            >>>>> ignore
            >>>>>>> that
            >>>>>>>>>>>> and continue to allocate buffer while
            LocalBufferPool has
            >>>> it(it
            >>>>>> is
            >>>>>>>>>>>> actually not a overdraft).
            >>>>>>>>>>>>
            >>>>>>>>>>>> The second task is about the real overdraft.
            I am pretty
            >>>>>> convinced
            >>>>>>>> now
            >>>>>>>>>>>> that we, unfortunately, need configuration
            for limitation of
            >>>>>>>> overdraft
            >>>>>>>>>>>> number(because it is not ok if one subtask
            allocates all
            >>>>> buffers
            >>>>>> of
            >>>>>>>> one
            >>>>>>>>>>>> TaskManager considering that several
            different jobs can be
            >>>>>>> submitted
            >>>>>>>> on
            >>>>>>>>>>>> this TaskManager). So idea is to have
            >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically
            to say per
            >>>>>>>>>> LocalBufferPool).
            >>>>>>>>>>>> In this case, when a limit of buffers in
            LocalBufferPool is
            >>>>>>> reached,
            >>>>>>>>>>>> LocalBufferPool can request additionally from
            >>>> NetworkBufferPool
            >>>>>> up
            >>>>>>> to
            >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers.
            >>>>>>>>>>>>
            >>>>>>>>>>>>
            >>>>>>>>>>>> But it is still not clear how to handle
            LegacySource since it
            >>>>>>>> actually
            >>>>>>>>>>>> works as unlimited flatmap and it will always
            work in
            >>>> overdraft
            >>>>>>> mode
            >>>>>>>>>>>> which is not a target. So we still need to
            think about that.
            >>>>>>>>>>>>
            >>>>>>>>>>>>
            >>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет:
            >>>>>>>>>>>>> Hi Anton Kalashnikov,
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> I think you agree with we should limit the
            maximum number of
            >>>>>>>> overdraft
            >>>>>>>>>>>>> segments that each LocalBufferPool can apply
            for, right?
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> I prefer to hard code the
            maxOverdraftBuffers due to don't
            >>>> add
            >>>>>> the
            >>>>>>>> new
            >>>>>>>>>>>>> configuration. And I hope to hear more from
            the community.
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> Best wishes
            >>>>>>>>>>>>> fanrui
            >>>>>>>>>>>>>
            >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <
            >>>>> 1996fan...@gmail.com>
            >>>>>>>>>> wrote:
            >>>>>>>>>>>>>> Hi Anton Kalashnikov,
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> Thanks for your very clear reply, I think
            you are totally
            >>>>>> right.
            >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber'
            can be used as
            >>>>> the
            >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer
            >>>>>>> configuration.Flink
            >>>>>>>>>> users
            >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control
            the overdraft
            >>>>>> buffer
            >>>>>>>>>> size.
            >>>>>>>>>>>>>> Also, I‘d like to add some information. For
            safety, we
            >>>> should
            >>>>>>> limit
            >>>>>>>>>> the
            >>>>>>>>>>>>>> maximum number of overdraft segments that each
            >>>>> LocalBufferPool
            >>>>>>>>>>>>>> can apply for.
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> Why do we limit it?
            >>>>>>>>>>>>>> Some operators don't check the
            `recordWriter.isAvailable`
            >>>>>> during
            >>>>>>>>>>>>>> processing records, such as LegacySource. I
            have mentioned
            >>>> it
            >>>>>> in
            >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are
            other cases.
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> If don't add the limitation, the
            LegacySource will use up
            >>>> all
            >>>>>>>>>> remaining
            >>>>>>>>>>>>>> memory in the NetworkBufferPool when the
            backpressure is
            >>>>>> severe.
            >>>>>>>>>>>>>> How to limit it?
            >>>>>>>>>>>>>> I prefer to hard code the
            >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions`
            >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The
            >>>>> maxOverdraftBuffers
            >>>>>> is
            >>>>>>>>>> just
            >>>>>>>>>>>>>> for safety, and it should be enough for
            most flink jobs. Or
            >>>>> we
            >>>>>>> can
            >>>>>>>>>> set
            >>>>>>>>>>>>>>
            `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)`
            >>>> to
            >>>>>>> handle
            >>>>>>>>>>>>>> some jobs of low parallelism.
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> Also if user don't enable the Unaligned
            Checkpoint, we can
            >>>>> set
            >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of
            >>>> LocalBufferPool.
            >>>>>>>> Because
            >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned
            Checkpoint.
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot.
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> [1]
            https://issues.apache.org/jira/browse/FLINK-26759
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> Best wishes
            >>>>>>>>>>>>>> fanrui
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton
            Kalashnikov <
            >>>>>>>>>> kaa....@yandex.com>
            >>>>>>>>>>>>>> wrote:
            >>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> Hi fanrui,
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> Thanks for creating the FLIP.
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> In general, I think the overdraft is good
            idea and it
            >>>> should
            >>>>>>> help
            >>>>>>>> in
            >>>>>>>>>>>>>>> described above cases. Here are my
            thoughts about
            >>>>>> configuration:
            >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I
            understand right
            >>>>> now
            >>>>>>> we
            >>>>>>>>>> have
            >>>>>>>>>>>>>>> following calculation.
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network
            >>>>> memory(calculated
            >>>>>>> via
            >>>>>>>>>>>>>>> taskmanager.memory.network.fraction,
            >>>>>>>> taskmanager.memory.network.min,
            >>>>>>>>>>>>>>> taskmanager.memory.network.max and total
            memory size) /
            >>>>>>>>>>>>>>> taskmanager.memory.segment-size.
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) =
            (exclusive
            >>>> buffers
            >>>>> *
            >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks
            number in
            >>>>>> TaskManager
            >>>>>>>>>>>>>>> buffersInUseNumber = real number of
            buffers which used at
            >>>>>>> current
            >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber)
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be
            equal to
            >>>>>>> maxBuffersNumber
            >>>>>>>>>> which
            >>>>>>>>>>>>>>> allows Flink work predictibly. But if
            >>>> requiredBuffersNumber
            >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also
            fine(but not good)
            >>>>> since
            >>>>>>> not
            >>>>>>>>>> all
            >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it
            is ok if Flink
            >>>> can
            >>>>>> not
            >>>>>>>>>>>>>>> allocate floating buffers)
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> But if maxBuffersNumber >
            requiredBuffersNumber, as I
            >>>>>> understand
            >>>>>>>>>> Flink
            >>>>>>>>>>>>>>> just never use these leftovers
            buffers(maxBuffersNumber -
            >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to
            use. ( we can
            >>>>>> actualy
            >>>>>>>> use
            >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber -
            >>>> buffersInUseNumber'
            >>>>>>> since
            >>>>>>>>>> if
            >>>>>>>>>>>>>>> one TaskManager contains several operators
            including
            >>>>> 'window'
            >>>>>>>> which
            >>>>>>>>>> can
            >>>>>>>>>>>>>>> temporally borrow buffers from the global
            pool).
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> My proposal, more specificaly(it relates
            only to
            >>>> requesting
            >>>>>>>> buffers
            >>>>>>>>>>>>>>> during processing single record while
            switching to
            >>>>>> unavalability
            >>>>>>>>>>>> between
            >>>>>>>>>>>>>>> records should be the same as we have it now):
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> * If one more buffer requested but
            maxBuffersPerChannel
            >>>>>> reached,
            >>>>>>>>>> then
            >>>>>>>>>>>>>>> just ignore this limitation and allocate
            this buffers from
            >>>>> any
            >>>>>>>>>>>>>>> place(from LocalBufferPool if it has
            something yet
            >>>> otherwise
            >>>>>>> from
            >>>>>>>>>>>>>>> NetworkBufferPool)
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then
            temporally
            >>>> allocate
            >>>>>> it
            >>>>>>>> from
            >>>>>>>>>>>>>>> NetworkBufferPool while it has something
            to allocate
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> Maybe I missed something and this solution
            won't work,
            >>>> but I
            >>>>>>> like
            >>>>>>>> it
            >>>>>>>>>>>>>>> since on the one hand, it work from the
            scratch without
            >>>> any
            >>>>>>>>>>>>>>> configuration, on the other hand, it can
            be configuration
            >>>> by
            >>>>>>>>>> changing
            >>>>>>>>>>>>>>> proportion of maxBuffersNumber and
            requiredBuffersNumber.
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> The last thing that I want to say, I don't
            really want to
            >>>>>>>> implement
            >>>>>>>>>> new
            >>>>>>>>>>>>>>> configuration since even now it is not
            clear how to
            >>>>> correctly
            >>>>>>>>>> configure
            >>>>>>>>>>>>>>> network buffers with existing
            configuration and I don't
            >>>> want
            >>>>>> to
            >>>>>>>>>>>>>>> complicate it, especially if it will be
            possible to
            >>>> resolve
            >>>>>> the
            >>>>>>>>>> problem
            >>>>>>>>>>>>>>> automatically(as described above).
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> So is my understanding about network
            memory/buffers
            >>>> correct?
            >>>>>>>>>>>>>>> --
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> Best regards,
            >>>>>>>>>>>>>>> Anton Kalashnikov
            >>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет:
            >>>>>>>>>>>>>>>> Hi everyone,
            >>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a
            major feature of
            >>>>>> Flink.
            >>>>>>>> It
            >>>>>>>>>>>>>>>> effectively solves the problem of
            checkpoint timeout or
            >>>>> slow
            >>>>>>>>>>>>>>>> checkpoint when backpressure is severe.
            >>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint)
            does not work well
            >>>>>> when
            >>>>>>>> the
            >>>>>>>>>>>>>>>> back pressure is severe and multiple
            output buffers are
            >>>>>>> required
            >>>>>>>> to
            >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2]
            also mentioned
            >>>>> this
            >>>>>>>> issue
            >>>>>>>>>>>>>>>> before. So we propose the overdraft
            buffer to solve it.
            >>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4]
            to detail the
            >>>>>>> overdraft
            >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton
            >>>> Kalashnikov,
            >>>>>>> there
            >>>>>>>>>> are
            >>>>>>>>>>>>>>>> still some points to discuss:
            >>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>>       * There are already a lot of
            buffer-related
            >>>>>> configurations.
            >>>>>>>> Do
            >>>>>>>>>> we
            >>>>>>>>>>>>>>>>  need to add a new configuration for the
            overdraft
            >>>>>> buffer?
            >>>>>>>>>>>>>>>>       * Where should the overdraft buffer
            use memory?
            >>>>>>>>>>>>>>>>       * If the overdraft-buffer uses the
            memory remaining
            >>>> in
            >>>>>> the
            >>>>>>>>>>>>>>>>  NetworkBufferPool, no new configuration
            needs to be
            >>>>>>> added.
            >>>>>>>>>>>>>>>>       * If adding a new configuration:
            >>>>>>>>>>>>>>>>    o Should we set the
            overdraft-memory-size at the
            >>>> TM
            >>>>>>> level
            >>>>>>>>>> or
            >>>>>>>>>>>> the
            >>>>>>>>>>>>>>>>      Task level?
            >>>>>>>>>>>>>>>>    o Or set overdraft-buffers to indicate
            the number
            >>>>> of
            >>>>>>>>>>>>>>>>      memory-segments that can be overdrawn.
            >>>>>>>>>>>>>>>>    o What is the default value? How to
            set sensible
            >>>>>>>> defaults?
            >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and
            verified it using
            >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets
            overdraft-buffers at
            >>>>> Task
            >>>>>>>> level,
            >>>>>>>>>>>>>>>> and default value is 10. That is: each
            LocalBufferPool
            >>>> can
            >>>>>>>>>> overdraw up
            >>>>>>>>>>>>>>>> to 10 memory-segments.
            >>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>> Looking forward to your feedback!
            >>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>> Thanks,
            >>>>>>>>>>>>>>>> fanrui
            >>>>>>>>>>>>>>>>
            >>>>>>>>>>>>>>>> [1]
            >>>>>>>>>>>>>>>>
            >>
            
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
            >>>>>>>>>>>>>>>> [2]
            https://issues.apache.org/jira/browse/FLINK-14396
            >>>>>>>>>>>>>>>> [3]
            https://issues.apache.org/jira/browse/FLINK-26762
            >>>>>>>>>>>>>>>> [4]
            >>>>>>>>>>>>>>>>
            >>
            
https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
            >>>>>>>>>>>>>>>> [5]
            >>>>>>>>>>>>>>>>
            >>
            
https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
            >>>>>>>>>>>>>>>> [6]
            https://github.com/apache/flink-benchmarks/pull/54
            >>>>>>>>>>>> --
            >>>>>>>>>>>>
            >>>>>>>>>>>> Best regards,
            >>>>>>>>>>>> Anton Kalashnikov
            >>>>>>>>>>>>
            >>>>>>>>>>>>
            >>>>>>>>>> --
            >>>>>>>>>>
            >>>>>>>>>> Best regards,
            >>>>>>>>>> Anton Kalashnikov
            >>>>>>>>>>
            >>>>>>>>>>
            >> --
            >>
            >> Best regards,
            >> Anton Kalashnikov
            >>
            >>

Reply via email to