Hi, Thanks a lot for your discussion.
After several discussions, I think it's clear now. I updated the "Proposed Changes" of FLIP-227[1]. If I have something missing, please help to add it to FLIP, or add it in the mail and I can add it to FLIP. If everything is OK, I will create a new JIRA for the first task, and use FLINK-26762[2] as the second task. About the legacy source, do we set maxOverdraftBuffersPerGate=0 directly? How to identify legacySource? Or could we add the overdraftEnabled in LocalBufferPool? The default value is false. If the getAvailableFuture is called, change overdraftEnabled=true. It indicates whether there are checks isAvailable elsewhere. It might be more general, it can cover more cases. Also, I think the default value of 'max-overdraft-buffers-per-gate' needs to be confirmed. I prefer it to be between 5 and 10. How do you think? [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer [2] https://issues.apache.org/jira/browse/FLINK-26762 Thanks fanrui On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi again, > > After sleeping over this, if both versions (reserve and overdraft) have > the same complexity, I would also prefer the overdraft. > > > `Integer.MAX_VALUE` as default value was my idea as well but now, as > > Dawid mentioned, I think it is dangerous since it is too implicit for > > the user and if the user submits one more job for the same TaskManger > > As I mentioned, it's not only an issue with multiple jobs. The same > problem can happen with different subtasks from the same job, potentially > leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I would be > in favour of Integer.MAX_VALUE to be the default value, but as it is, I > think we should indeed play on the safe side and limit it. > > > I still don't understand how should be limited "reserve" implementation. > > I mean if we have X buffers in total and the user sets overdraft equal > > to X we obviously can not reserve all buffers, but how many we are > > allowed to reserve? Should it be a different configuration like > > percentegeForReservedBuffers? > > The reserve could be defined as percentage, or as a fixed number of > buffers. But yes. In normal operation subtask would not use the reserve, as > if numberOfAvailableBuffers < reserve, the output would be not available. > Only in the flatMap/timers/huge records case the reserve could be used. > > > 1. If the total buffers of LocalBufferPool <= the reserve buffers, will > LocalBufferPool never be available? Can't process data? > > Of course we would need to make sure that never happens. So the reserve > should be < total buffer size. > > > 2. If the overdraft buffer use the extra buffers, when the downstream > > task inputBuffer is insufficient, it should fail to start the job, and > then > > restart? When the InputBuffer is initialized, it will apply for enough > > buffers, right? > > The failover if downstream can not allocate buffers is already implemented > FLINK-14872 [2]. There is a timeout for how long the task is waiting for > buffer allocation. However this doesn't prevent many (potentially > infinitely many) deadlock/restarts cycles. IMO the propper solution for [1] > would be 2b described in the ticket: > > > 2b. Assign extra buffers only once all of the tasks are RUNNING. This is > a simplified version of 2a, without tracking the tasks sink-to-source. > > But that's a pre-existing problem and I don't think we have to solve it > before implementing overdraft. I think we would need to solve it only > before setting Integer.MAX_VALUE as the default for the overdraft. Maybe I > would hesitate setting the overdraft to anything more then a couple of > buffers by default for the same reason. > > > Actually, I totally agree that we don't need a lot of buffers for > overdraft > > and > > > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. > > When we finish this feature and after users use it, if users feedback > > this issue we can discuss again. > > +1 > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-13203 > [2] https://issues.apache.org/jira/browse/FLINK-14872 > > czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a): > >> Hi everyone, >> >> I still have some questions. >> >> 1. If the total buffers of LocalBufferPool <= the reserve buffers, will >> LocalBufferPool never be available? Can't process data? >> 2. If the overdraft buffer use the extra buffers, when the downstream >> task inputBuffer is insufficient, it should fail to start the job, and >> then >> restart? When the InputBuffer is initialized, it will apply for enough >> buffers, right? >> >> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. >> When we finish this feature and after users use it, if users feedback >> this issue we can discuss again. >> >> Thanks >> fanrui >> >> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >>> Hey all, >>> >>> I have not replied in the thread yet, but I was following the discussion. >>> >>> Personally, I like Fanrui's and Anton's idea. As far as I understand it >>> the idea to distinguish between inside flatMap & outside would be fairly >>> simple, but maybe slightly indirect. The checkAvailability would remain >>> unchanged and it is checked always between separate invocations of the >>> UDF. Therefore the overdraft buffers would not apply there. However once >>> the pool says it is available, it means it has at least an initial >>> buffer. So any additional request without checking for availability can >>> be considered to be inside of processing a single record. This does not >>> hold just for the LegacySource as I don't think it actually checks for >>> the availability of buffers in the LocalBufferPool. >>> >>> In the offline chat with Anton, we also discussed if we need a limit of >>> the number of buffers we could overdraft (or in other words if the limit >>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to stay >>> on the safe side and have it limited. The pool of network buffers is >>> shared for the entire TaskManager, so it means it can be shared even >>> across tasks of separate jobs. However, I might be just unnecessarily >>> cautious here. >>> >>> Best, >>> >>> Dawid >>> >>> On 04/05/2022 10:54, Piotr Nowojski wrote: >>> > Hi, >>> > >>> > Thanks for the answers. >>> > >>> >> we may still need to discuss whether the >>> >> overdraft/reserve/spare should use extra buffers or buffers >>> >> in (exclusive + floating buffers)? >>> > and >>> > >>> >> These things resolve the different problems (at least as I see that). >>> >> The current hardcoded "1" says that we switch "availability" to >>> >> "unavailability" when one more buffer is left(actually a little less >>> >> than one buffer since we write the last piece of data to this last >>> >> buffer). The overdraft feature doesn't change this logic we still want >>> >> to switch to "unavailability" in such a way but if we are already in >>> >> "unavailability" and we want more buffers then we can take "overdraft >>> >> number" more. So we can not avoid this hardcoded "1" since we need to >>> >> understand when we should switch to "unavailability" >>> > Ok, I see. So it seems to me that both of you have in mind to keep the >>> > buffer pools as they are right now, but if we are in the middle of >>> > processing a record, we can request extra overdraft buffers on top of >>> > those? This is another way to implement the overdraft to what I was >>> > thinking. I was thinking about something like keeping the "overdraft" >>> or >>> > more precisely buffer "reserve" in the buffer pool. I think my version >>> > would be easier to implement, because it is just fiddling with min/max >>> > buffers calculation and slightly modified `checkAvailability()` logic. >>> > >>> > On the other hand what you have in mind would better utilise the >>> available >>> > memory, right? It would require more code changes (how would we know >>> when >>> > we are allowed to request the overdraft?). However, in this case, I >>> would >>> > be tempted to set the number of overdraft buffers by default to >>> > `Integer.MAX_VALUE`, and let the system request as many buffers as >>> > necessary. The only downside that I can think of (apart of higher >>> > complexity) would be higher chance of hitting a known/unsolved >>> deadlock [1] >>> > in a scenario: >>> > - downstream task hasn't yet started >>> > - upstream task requests overdraft and uses all available memory >>> segments >>> > from the global pool >>> > - upstream task is blocked, because downstream task hasn't started yet >>> and >>> > can not consume any data >>> > - downstream task tries to start, but can not, as there are no >>> available >>> > buffers >>> > >>> >> BTW, for watermark, the number of buffers it needs is >>> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, >>> >> the watermark won't block in requestMemory. >>> > and >>> > >>> >> the best overdraft size will be equal to parallelism. >>> > That's a lot of buffers. I don't think we need that many for >>> broadcasting >>> > watermarks. Watermarks are small, and remember that every subpartition >>> has >>> > some partially filled/empty WIP buffer, so the vast majority of >>> > subpartitions will not need to request a new buffer. >>> > >>> > Best, >>> > Piotrek >>> > >>> > [1] https://issues.apache.org/jira/browse/FLINK-13203 >>> > >>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> >>> napisał(a): >>> > >>> >> Hi, >>> >> >>> >> >>> >> >> Do you mean to ignore it while processing records, but keep using >>> >> `maxBuffersPerChannel` when calculating the availability of the >>> output? >>> >> >>> >> >>> >> Yes, it is correct. >>> >> >>> >> >>> >> >> Would it be a big issue if we changed it to check if at least >>> >> "overdraft number of buffers are available", where "overdraft number" >>> is >>> >> configurable, instead of the currently hardcoded value of "1"? >>> >> >>> >> >>> >> These things resolve the different problems (at least as I see that). >>> >> The current hardcoded "1" says that we switch "availability" to >>> >> "unavailability" when one more buffer is left(actually a little less >>> >> than one buffer since we write the last piece of data to this last >>> >> buffer). The overdraft feature doesn't change this logic we still want >>> >> to switch to "unavailability" in such a way but if we are already in >>> >> "unavailability" and we want more buffers then we can take "overdraft >>> >> number" more. So we can not avoid this hardcoded "1" since we need to >>> >> understand when we should switch to "unavailability" >>> >> >>> >> >>> >> -- About "reserve" vs "overdraft" >>> >> >>> >> As Fanrui mentioned above, perhaps, the best overdraft size will be >>> >> equal to parallelism. Also, the user can set any value he wants. So >>> even >>> >> if parallelism is small(~5) but the user's flatmap produces a lot of >>> >> data, the user can set 10 or even more. Which almost double the max >>> >> buffers and it will be impossible to reserve. At least we need to >>> figure >>> >> out how to protect from such cases (the limit for an overdraft?). So >>> >> actually it looks even more difficult than increasing the maximum >>> buffers. >>> >> >>> >> I want to emphasize that overdraft buffers are soft configuration >>> which >>> >> means it takes as many buffers as the global buffers pool has >>> >> available(maybe zero) but less than this configured value. It is also >>> >> important to notice that perhaps, not many subtasks in TaskManager >>> will >>> >> be using this feature so we don't actually need a lot of available >>> >> buffers for every subtask(Here, I mean that if we have only one >>> >> window/flatmap operator and many other operators, then one TaskManager >>> >> will have many ordinary subtasks which don't actually need overdraft >>> and >>> >> several subtasks that needs this feature). But in case of reservation, >>> >> we will reserve some buffers for all operators even if they don't >>> really >>> >> need it. >>> >> >>> >> >>> >> -- Legacy source problem >>> >> >>> >> If we still want to change max buffers then it is problem for >>> >> LegacySources(since every subtask of source will always use these >>> >> overdraft). But right now, I think that we can force to set 0 >>> overdraft >>> >> buffers for legacy subtasks in configuration during execution(if it is >>> >> not too late for changing configuration in this place). >>> >> >>> >> >>> >> 03.05.2022 14:11, rui fan пишет: >>> >>> Hi >>> >>> >>> >>> Thanks for Martijn Visser and Piotrek's feedback. I agree with >>> >>> ignoring the legacy source, it will affect our design. User should >>> >>> use the new Source Api as much as possible. >>> >>> >>> >>> Hi Piotrek, we may still need to discuss whether the >>> >>> overdraft/reserve/spare should use extra buffers or buffers >>> >>> in (exclusive + floating buffers)? They have some differences. >>> >>> >>> >>> If it uses extra buffers: >>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1 >>> >>> <= currentPoolSize) and all subpartitions don't reach the >>> >>> maxBuffersPerChannel. >>> >>> >>> >>> If it uses the buffers in (exclusive + floating buffers): >>> >>> 1. The LocalBufferPool will be available when (usedBuffers + >>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions >>> >>> don't reach the maxBuffersPerChannel. >>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), the >>> >>> usedBuffers will be small. That is the LocalBufferPool will be >>> >>> easily unavailable. For throughput, if users turn up the >>> >>> overdraft buffers, they need to turn up exclusive or floating >>> >>> buffers. It also affects the InputChannel, and it's is unfriendly >>> >>> to users. >>> >>> >>> >>> So I prefer the overdraft to use extra buffers. >>> >>> >>> >>> >>> >>> BTW, for watermark, the number of buffers it needs is >>> >>> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, >>> >>> the watermark won't block in requestMemory. But it has >>> >>> 2 problems: >>> >>> 1. It needs more overdraft buffers. If the overdraft uses >>> >>> (exclusive + floating buffers), there will be fewer buffers >>> >>> available. Throughput may be affected. >>> >>> 2. The numberOfSubpartitions is different for each Task. >>> >>> So if users want to cover watermark using this feature, >>> >>> they don't know how to set the overdraftBuffers more r >>> >>> easonably. And if the parallelism is changed, users still >>> >>> need to change overdraftBuffers. It is unfriendly to users. >>> >>> >>> >>> So I propose we support overdraftBuffers=-1, It means >>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions >>> >>> in the Constructor of LocalBufferPool. >>> >>> >>> >>> Please correct me if I'm wrong. >>> >>> >>> >>> Thanks >>> >>> fanrui >>> >>> >>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <pnowoj...@apache.org> >>> >> wrote: >>> >>>> Hi fanrui, >>> >>>> >>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive >>> >> buffers * >>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be >>> available >>> >>>> when >>> >>>>> (usedBuffers+overdraftBuffers <= >>> >>>> exclusiveBuffers*parallelism+floatingBuffers) >>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, right? >>> >>>> I'm not sure. Definitely we would need to adjust the minimum number >>> of >>> >> the >>> >>>> required buffers, just as we did when we were implementing the non >>> >> blocking >>> >>>> outputs and adding availability logic to LocalBufferPool. Back then >>> we >>> >>>> added "+ 1" to the minimum number of buffers. Currently this logic >>> is >>> >>>> located NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition: >>> >>>> >>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions >>> + 1; >>> >>>> For performance reasons, we always require at least one buffer per >>> >>>> sub-partition. Otherwise performance falls drastically. Now if we >>> >> require 5 >>> >>>> overdraft buffers for output to be available, we need to have them >>> on >>> >> top >>> >>>> of those "one buffer per sub-partition". So the logic should be >>> changed >>> >> to: >>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions >>> + >>> >>>> numOverdraftBuffers; >>> >>>> >>> >>>> Regarding increasing the number of max buffers I'm not sure. As >>> long as >>> >>>> "overdraft << max number of buffers", because all buffers on the >>> outputs >>> >>>> are shared across all sub-partitions. If we have 5 overdraft >>> buffers, >>> >> and >>> >>>> parallelism of 100, it doesn't matter in the grand scheme of things >>> if >>> >> we >>> >>>> make the output available if at least one single buffer is >>> available or >>> >> at >>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So effects >>> of >>> >>>> increasing the overdraft from 1 to for example 5 should be >>> negligible. >>> >> For >>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 still >>> >> increases >>> >>>> the overdraft by only about 25%. So maybe we can keep the max as it >>> is? >>> >>>> >>> >>>> If so, maybe we should change the name from "overdraft" to "buffer >>> >> reserve" >>> >>>> or "spare buffers"? And document it as "number of buffers kept in >>> >> reserve >>> >>>> in case of flatMap/firing timers/huge records"? >>> >>>> >>> >>>> What do you think Fenrui, Anton? >>> >>>> >>> >>>> Re LegacySources. I agree we can kind of ignore them in the new >>> >> features, >>> >>>> as long as we don't brake the existing deployments too much. >>> >>>> >>> >>>> Best, >>> >>>> Piotrek >>> >>>> >>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com> >>> >> napisał(a): >>> >>>>> Hi everyone, >>> >>>>> >>> >>>>> Just wanted to chip in on the discussion of legacy sources: IMHO, >>> we >>> >>>> should >>> >>>>> not focus too much on improving/adding capabilities for legacy >>> sources. >>> >>>> We >>> >>>>> want to persuade and push users to use the new Source API. Yes, >>> this >>> >>>> means >>> >>>>> that there's work required by the end users to port any custom >>> source >>> >> to >>> >>>>> the new interface. The benefits of the new Source API should >>> outweigh >>> >>>> this. >>> >>>>> Anything that we build to support multiple interfaces means adding >>> more >>> >>>>> complexity and more possibilities for bugs. Let's try to make our >>> >> lives a >>> >>>>> little bit easier. >>> >>>>> >>> >>>>> Best regards, >>> >>>>> >>> >>>>> Martijn Visser >>> >>>>> https://twitter.com/MartijnVisser82 >>> >>>>> https://github.com/MartijnVisser >>> >>>>> >>> >>>>> >>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> wrote: >>> >>>>> >>> >>>>>> Hi Piotrek >>> >>>>>> >>> >>>>>>> Do you mean to ignore it while processing records, but keep using >>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the >>> >>>> output? >>> >>>>>> I think yes, and please Anton Kalashnikov to help double check. >>> >>>>>> >>> >>>>>>> +1 for just having this as a separate configuration. Is it a big >>> >>>>> problem >>> >>>>>>> that legacy sources would be ignoring it? Note that we already >>> have >>> >>>>>>> effectively hardcoded a single overdraft buffer. >>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single >>> >>>> buffer >>> >>>>>>> available and this works the same for all tasks (including legacy >>> >>>>> source >>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at >>> least >>> >>>>>>> "overdraft number of buffers are available", where "overdraft >>> number" >>> >>>>> is >>> >>>>>>> configurable, instead of the currently hardcoded value of "1"? >>> >>>>>> Do you mean don't add the extra buffers? We just use (exclusive >>> >>>> buffers * >>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be >>> available >>> >>>>> when >>> >>>>>> (usedBuffers+overdraftBuffers <= >>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers) >>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, right? >>> >>>>>> >>> >>>>>> If yes, I think it can solve the problem of legacy source. There >>> may >>> >> be >>> >>>>>> some impact. If overdraftBuffers is large and only one buffer is >>> used >>> >>>> to >>> >>>>>> process a single record, exclusive buffers*parallelism + floating >>> >>>> buffers >>> >>>>>> cannot be used. It may only be possible to use (exclusive buffers >>> * >>> >>>>>> parallelism >>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, if >>> turn >>> >> up >>> >>>>> the >>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive or >>> >>>> floating >>> >>>>>> buffers. And it also affects the InputChannel. >>> >>>>>> >>> >>>>>> If not, I don't think it can solve the problem of legacy source. >>> The >>> >>>>> legacy >>> >>>>>> source don't check isAvailable, If there are the extra buffers, >>> legacy >>> >>>>>> source >>> >>>>>> will use them up until block in requestMemory. >>> >>>>>> >>> >>>>>> >>> >>>>>> Thanks >>> >>>>>> fanrui >>> >>>>>> >>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski < >>> pnowoj...@apache.org> >>> >>>>>> wrote: >>> >>>>>> >>> >>>>>>> Hi, >>> >>>>>>> >>> >>>>>>> +1 for the general proposal from my side. It would be a nice >>> >>>> workaround >>> >>>>>>> flatMaps, WindowOperators and large records issues with unaligned >>> >>>>>>> checkpoints. >>> >>>>>>> >>> >>>>>>>> The first task is about ignoring max buffers per channel. This >>> >>>> means >>> >>>>> if >>> >>>>>>>> we request a memory segment from LocalBufferPool and the >>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just ignore >>> >>>> that >>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has it(it >>> is >>> >>>>>>>> actually not a overdraft). >>> >>>>>>> Do you mean to ignore it while processing records, but keep using >>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the >>> >>>> output? >>> >>>>>>>> The second task is about the real overdraft. I am pretty >>> convinced >>> >>>>> now >>> >>>>>>>> that we, unfortunately, need configuration for limitation of >>> >>>>> overdraft >>> >>>>>>>> number(because it is not ok if one subtask allocates all >>> buffers of >>> >>>>> one >>> >>>>>>>> TaskManager considering that several different jobs can be >>> >>>> submitted >>> >>>>> on >>> >>>>>>>> this TaskManager). So idea is to have >>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >>> >>>>>> LocalBufferPool). >>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is >>> >>>> reached, >>> >>>>>>>> LocalBufferPool can request additionally from NetworkBufferPool >>> up >>> >>>> to >>> >>>>>>>> maxOverdraftBuffersPerPartition buffers. >>> >>>>>>> +1 for just having this as a separate configuration. Is it a big >>> >>>>> problem >>> >>>>>>> that legacy sources would be ignoring it? Note that we already >>> have >>> >>>>>>> effectively hardcoded a single overdraft buffer. >>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single >>> >>>> buffer >>> >>>>>>> available and this works the same for all tasks (including legacy >>> >>>>> source >>> >>>>>>> tasks). Would it be a big issue if we changed it to check if at >>> least >>> >>>>>>> "overdraft number of buffers are available", where "overdraft >>> number" >>> >>>>> is >>> >>>>>>> configurable, instead of the currently hardcoded value of "1"? >>> >>>>>>> >>> >>>>>>> Best, >>> >>>>>>> Piotrek >>> >>>>>>> >>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> >>> napisał(a): >>> >>>>>>> >>> >>>>>>>> Let me add some information about the LegacySource. >>> >>>>>>>> >>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource. >>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool? >>> >>>>>>>> The default value is false. If the getAvailableFuture is called, >>> >>>>>>>> change enableOverdraft=true. It indicates whether there are >>> >>>>>>>> checks isAvailable elsewhere. >>> >>>>>>>> >>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct me if >>> >>>> I'm >>> >>>>>>> wrong. >>> >>>>>>>> Thanks >>> >>>>>>>> fanrui >>> >>>>>>>> >>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> >>> >>>>> wrote: >>> >>>>>>>>> Hi, >>> >>>>>>>>> >>> >>>>>>>>> Thanks for your quick response. >>> >>>>>>>>> >>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need to >>> >>>>> discuss >>> >>>>>>> the >>> >>>>>>>>> default value in PR. >>> >>>>>>>>> >>> >>>>>>>>> For the legacy source, you are right. It's difficult for >>> general >>> >>>>>>>>> implementation. >>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in >>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common >>> >>>>> LegacySource, >>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume >>> >>>> kafka, >>> >>>>> so >>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems. >>> >>>>>>>>> >>> >>>>>>>>> Core code: >>> >>>>>>>>> ``` >>> >>>>>>>>> public void ensureRecordWriterIsAvailable() { >>> >>>>>>>>> if (recordWriter == null >>> >>>>>>>>> || >>> >>>>>>>>> >>> >> >>> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, >>> >>>>>>>>> false) >>> >>>>>>>>> || recordWriter.isAvailable()) { >>> >>>>>>>>> return; >>> >>>>>>>>> } >>> >>>>>>>>> >>> >>>>>>>>> CompletableFuture<?> resumeFuture = >>> >>>>>>>> recordWriter.getAvailableFuture(); >>> >>>>>>>>> try { >>> >>>>>>>>> resumeFuture.get(); >>> >>>>>>>>> } catch (Throwable ignored) { >>> >>>>>>>>> } >>> >>>>>>>>> } >>> >>>>>>>>> ``` >>> >>>>>>>>> >>> >>>>>>>>> LegacySource calls >>> sourceContext.ensureRecordWriterIsAvailable() >>> >>>>>>>>> before synchronized (checkpointLock) and collects records. >>> >>>>>>>>> Please let me know if there is a better solution. >>> >>>>>>>>> >>> >>>>>>>>> Thanks >>> >>>>>>>>> fanrui >>> >>>>>>>>> >>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov < >>> >>>>>> kaa....@yandex.com> >>> >>>>>>>>> wrote: >>> >>>>>>>>> >>> >>>>>>>>>> Hi. >>> >>>>>>>>>> >>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or two >>> >>>>>> commits >>> >>>>>>>> in a >>> >>>>>>>>>> PR? >>> >>>>>>>>>> >>> >>>>>>>>>> Perhaps, the separated ticket will be better since this task >>> has >>> >>>>>> fewer >>> >>>>>>>>>> questions but we should find a solution for LegacySource >>> first. >>> >>>>>>>>>> >>> >>>>>>>>>> -- 2. For the first task, if the flink user disables the >>> >>>>> Unaligned >>> >>>>>>>>>> Checkpoint, do we ignore max buffers per channel? >>> Because >>> >>>> the >>> >>>>>>>>>> overdraft >>> >>>>>>>>>> isn't useful for the Aligned Checkpoint, it still needs >>> to >>> >>>>> wait >>> >>>>>>> for >>> >>>>>>>>>> downstream Task to consume. >>> >>>>>>>>>> >>> >>>>>>>>>> I think that the logic should be the same for AC and UC. As I >>> >>>>>>>> understand, >>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it >>> doesn't >>> >>>>> make >>> >>>>>>> it >>> >>>>>>>>>> worse as well. >>> >>>>>>>>>> >>> >>>>>>>>>> 3. For the second task >>> >>>>>>>>>> -- - The default value of maxOverdraftBuffersPerPartition >>> >>>> may >>> >>>>>>> also >>> >>>>>>>>>> need >>> >>>>>>>>>> to be discussed. >>> >>>>>>>>>> >>> >>>>>>>>>> I think it should be a pretty small value or even 0 since it >>> >>>> kind >>> >>>>> of >>> >>>>>>>>>> optimization and user should understand what they >>> do(especially >>> >>>> if >>> >>>>>> we >>> >>>>>>>>>> implement the first task). >>> >>>>>>>>>> >>> >>>>>>>>>> -- - If the user disables the Unaligned Checkpoint, can >>> we >>> >>>>> set >>> >>>>>>> the >>> >>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >>> overdraft >>> >>>>>> isn't >>> >>>>>>>>>> useful for >>> >>>>>>>>>> the Aligned Checkpoint. >>> >>>>>>>>>> >>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make >>> >>>>>> degradation >>> >>>>>>>> for >>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make >>> >>>>> difference >>> >>>>>>>> between >>> >>>>>>>>>> AC and UC. >>> >>>>>>>>>> >>> >>>>>>>>>> 4. For the legacy source >>> >>>>>>>>>> -- - If enabling the Unaligned Checkpoint, it uses up to >>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>> >>>>>>>>>> - If disabling the UC, it doesn't use the overdraft >>> >>>> buffer. >>> >>>>>>>>>> - Do you think it's ok? >>> >>>>>>>>>> >>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at all >>> >>>>> since >>> >>>>>>> it >>> >>>>>>>>>> can lead to undesirable results especially if the limit is >>> high. >>> >>>>> At >>> >>>>>>>> least, >>> >>>>>>>>>> as I understand, it will always work in overdraft mode and it >>> >>>> will >>> >>>>>>>> borrow >>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool >>> >>>> which >>> >>>>>> can >>> >>>>>>>> lead >>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager. >>> >>>>>>>>>> >>> >>>>>>>>>> -- - Actually, we added the checkAvailable logic for >>> >>>>>> LegacySource >>> >>>>>>>> in >>> >>>>>>>>>> our >>> >>>>>>>>>> internal version. It works well. >>> >>>>>>>>>> >>> >>>>>>>>>> I don't really understand how it is possible for general case >>> >>>>>>>> considering >>> >>>>>>>>>> that each user has their own implementation of >>> >>>>> LegacySourceOperator >>> >>>>>>>>>> -- 5. For the benchmark, do you have any suggestions? I >>> >>>>> submitted >>> >>>>>>> the >>> >>>>>>>> PR >>> >>>>>>>>>> [1]. >>> >>>>>>>>>> >>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon. >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет: >>> >>>>>>>>>>> Hi, >>> >>>>>>>>>>> >>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions. >>> >>>>>>>>>>> >>> >>>>>>>>>>> 1. Do you mean split this into two JIRAs or two PRs or >>> two >>> >>>>>>> commits >>> >>>>>>>>>> in a >>> >>>>>>>>>>> PR? >>> >>>>>>>>>>> 2. For the first task, if the flink user disables the >>> >>>>>> Unaligned >>> >>>>>>>>>>> Checkpoint, do we ignore max buffers per channel? >>> Because >>> >>>>> the >>> >>>>>>>>>> overdraft >>> >>>>>>>>>>> isn't useful for the Aligned Checkpoint, it still >>> needs to >>> >>>>>> wait >>> >>>>>>>> for >>> >>>>>>>>>>> downstream Task to consume. >>> >>>>>>>>>>> 3. For the second task >>> >>>>>>>>>>> - The default value of >>> maxOverdraftBuffersPerPartition >>> >>>>> may >>> >>>>>>> also >>> >>>>>>>>>> need >>> >>>>>>>>>>> to be discussed. >>> >>>>>>>>>>> - If the user disables the Unaligned Checkpoint, >>> can we >>> >>>>> set >>> >>>>>>> the >>> >>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >>> >>>> overdraft >>> >>>>>>> isn't >>> >>>>>>>>>> useful for >>> >>>>>>>>>>> the Aligned Checkpoint. >>> >>>>>>>>>>> 4. For the legacy source >>> >>>>>>>>>>> - If enabling the Unaligned Checkpoint, it uses up >>> to >>> >>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>> >>>>>>>>>>> - If disabling the UC, it doesn't use the overdraft >>> >>>>> buffer. >>> >>>>>>>>>>> - Do you think it's ok? >>> >>>>>>>>>>> - Actually, we added the checkAvailable logic for >>> >>>>>>> LegacySource >>> >>>>>>>>>> in our >>> >>>>>>>>>>> internal version. It works well. >>> >>>>>>>>>>> 5. For the benchmark, do you have any suggestions? I >>> >>>>> submitted >>> >>>>>>> the >>> >>>>>>>>>> PR >>> >>>>>>>>>>> [1]. >>> >>>>>>>>>>> >>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54 >>> >>>>>>>>>>> >>> >>>>>>>>>>> Thanks >>> >>>>>>>>>>> fanrui >>> >>>>>>>>>>> >>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov < >>> >>>>>>> kaa....@yandex.com >>> >>>>>>>>>>> wrote: >>> >>>>>>>>>>> >>> >>>>>>>>>>>> Hi, >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here is >>> >>>>> some >>> >>>>>>>>>>>> conclusion: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> First of all, let's split this into two tasks. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> The first task is about ignoring max buffers per channel. >>> >>>> This >>> >>>>>>> means >>> >>>>>>>> if >>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the >>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just >>> >>>>> ignore >>> >>>>>>> that >>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool has >>> >>>> it(it >>> >>>>>> is >>> >>>>>>>>>>>> actually not a overdraft). >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty >>> >>>>>> convinced >>> >>>>>>>> now >>> >>>>>>>>>>>> that we, unfortunately, need configuration for limitation of >>> >>>>>>>> overdraft >>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all >>> >>>>> buffers >>> >>>>>> of >>> >>>>>>>> one >>> >>>>>>>>>>>> TaskManager considering that several different jobs can be >>> >>>>>>> submitted >>> >>>>>>>> on >>> >>>>>>>>>>>> this TaskManager). So idea is to have >>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >>> >>>>>>>>>> LocalBufferPool). >>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool is >>> >>>>>>> reached, >>> >>>>>>>>>>>> LocalBufferPool can request additionally from >>> >>>> NetworkBufferPool >>> >>>>>> up >>> >>>>>>> to >>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource since >>> it >>> >>>>>>>> actually >>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in >>> >>>> overdraft >>> >>>>>>> mode >>> >>>>>>>>>>>> which is not a target. So we still need to think about that. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет: >>> >>>>>>>>>>>>> Hi Anton Kalashnikov, >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> I think you agree with we should limit the maximum number >>> of >>> >>>>>>>> overdraft >>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, right? >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to don't >>> >>>> add >>> >>>>>> the >>> >>>>>>>> new >>> >>>>>>>>>>>>> configuration. And I hope to hear more from the community. >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> Best wishes >>> >>>>>>>>>>>>> fanrui >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan < >>> >>>>> 1996fan...@gmail.com> >>> >>>>>>>>>> wrote: >>> >>>>>>>>>>>>>> Hi Anton Kalashnikov, >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are totally >>> >>>>>> right. >>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used as >>> >>>>> the >>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer >>> >>>>>>> configuration.Flink >>> >>>>>>>>>> users >>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the overdraft >>> >>>>>> buffer >>> >>>>>>>>>> size. >>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we >>> >>>> should >>> >>>>>>> limit >>> >>>>>>>>>> the >>> >>>>>>>>>>>>>> maximum number of overdraft segments that each >>> >>>>> LocalBufferPool >>> >>>>>>>>>>>>>> can apply for. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Why do we limit it? >>> >>>>>>>>>>>>>> Some operators don't check the `recordWriter.isAvailable` >>> >>>>>> during >>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have mentioned >>> >>>> it >>> >>>>>> in >>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use up >>> >>>> all >>> >>>>>>>>>> remaining >>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is >>> >>>>>> severe. >>> >>>>>>>>>>>>>> How to limit it? >>> >>>>>>>>>>>>>> I prefer to hard code the >>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions` >>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The >>> >>>>> maxOverdraftBuffers >>> >>>>>> is >>> >>>>>>>>>> just >>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink jobs. >>> Or >>> >>>>> we >>> >>>>>>> can >>> >>>>>>>>>> set >>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` >>> >>>> to >>> >>>>>>> handle >>> >>>>>>>>>>>>>> some jobs of low parallelism. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we can >>> >>>>> set >>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of >>> >>>> LocalBufferPool. >>> >>>>>>>> Because >>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759 >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Best wishes >>> >>>>>>>>>>>>>> fanrui >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov < >>> >>>>>>>>>> kaa....@yandex.com> >>> >>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Hi fanrui, >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Thanks for creating the FLIP. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it >>> >>>> should >>> >>>>>>> help >>> >>>>>>>> in >>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about >>> >>>>>> configuration: >>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand >>> right >>> >>>>> now >>> >>>>>>> we >>> >>>>>>>>>> have >>> >>>>>>>>>>>>>>> following calculation. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network >>> >>>>> memory(calculated >>> >>>>>>> via >>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction, >>> >>>>>>>> taskmanager.memory.network.min, >>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) / >>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive >>> >>>> buffers >>> >>>>> * >>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in >>> >>>>>> TaskManager >>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used at >>> >>>>>>> current >>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to >>> >>>>>>> maxBuffersNumber >>> >>>>>>>>>> which >>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if >>> >>>> requiredBuffersNumber >>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not good) >>> >>>>> since >>> >>>>>>> not >>> >>>>>>>>>> all >>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if Flink >>> >>>> can >>> >>>>>> not >>> >>>>>>>>>>>>>>> allocate floating buffers) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I >>> >>>>>> understand >>> >>>>>>>>>> Flink >>> >>>>>>>>>>>>>>> just never use these leftovers buffers(maxBuffersNumber - >>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we can >>> >>>>>> actualy >>> >>>>>>>> use >>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber - >>> >>>> buffersInUseNumber' >>> >>>>>>> since >>> >>>>>>>>>> if >>> >>>>>>>>>>>>>>> one TaskManager contains several operators including >>> >>>>> 'window' >>> >>>>>>>> which >>> >>>>>>>>>> can >>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool). >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to >>> >>>> requesting >>> >>>>>>>> buffers >>> >>>>>>>>>>>>>>> during processing single record while switching to >>> >>>>>> unavalability >>> >>>>>>>>>>>> between >>> >>>>>>>>>>>>>>> records should be the same as we have it now): >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel >>> >>>>>> reached, >>> >>>>>>>>>> then >>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers >>> from >>> >>>>> any >>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet >>> >>>> otherwise >>> >>>>>>> from >>> >>>>>>>>>>>>>>> NetworkBufferPool) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally >>> >>>> allocate >>> >>>>>> it >>> >>>>>>>> from >>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't work, >>> >>>> but I >>> >>>>>>> like >>> >>>>>>>> it >>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch without >>> >>>> any >>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be configuration >>> >>>> by >>> >>>>>>>>>> changing >>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and requiredBuffersNumber. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really want to >>> >>>>>>>> implement >>> >>>>>>>>>> new >>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to >>> >>>>> correctly >>> >>>>>>>>>> configure >>> >>>>>>>>>>>>>>> network buffers with existing configuration and I don't >>> >>>> want >>> >>>>>> to >>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to >>> >>>> resolve >>> >>>>>> the >>> >>>>>>>>>> problem >>> >>>>>>>>>>>>>>> automatically(as described above). >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers >>> >>>> correct? >>> >>>>>>>>>>>>>>> -- >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Best regards, >>> >>>>>>>>>>>>>>> Anton Kalashnikov >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет: >>> >>>>>>>>>>>>>>>> Hi everyone, >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature of >>> >>>>>> Flink. >>> >>>>>>>> It >>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout or >>> >>>>> slow >>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work >>> well >>> >>>>>> when >>> >>>>>>>> the >>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers are >>> >>>>>>> required >>> >>>>>>>> to >>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also mentioned >>> >>>>> this >>> >>>>>>>> issue >>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve it. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the >>> >>>>>>> overdraft >>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton >>> >>>> Kalashnikov, >>> >>>>>>> there >>> >>>>>>>>>> are >>> >>>>>>>>>>>>>>>> still some points to discuss: >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> * There are already a lot of buffer-related >>> >>>>>> configurations. >>> >>>>>>>> Do >>> >>>>>>>>>> we >>> >>>>>>>>>>>>>>>> need to add a new configuration for the >>> overdraft >>> >>>>>> buffer? >>> >>>>>>>>>>>>>>>> * Where should the overdraft buffer use memory? >>> >>>>>>>>>>>>>>>> * If the overdraft-buffer uses the memory >>> remaining >>> >>>> in >>> >>>>>> the >>> >>>>>>>>>>>>>>>> NetworkBufferPool, no new configuration needs >>> to be >>> >>>>>>> added. >>> >>>>>>>>>>>>>>>> * If adding a new configuration: >>> >>>>>>>>>>>>>>>> o Should we set the overdraft-memory-size at >>> the >>> >>>> TM >>> >>>>>>> level >>> >>>>>>>>>> or >>> >>>>>>>>>>>> the >>> >>>>>>>>>>>>>>>> Task level? >>> >>>>>>>>>>>>>>>> o Or set overdraft-buffers to indicate the >>> number >>> >>>>> of >>> >>>>>>>>>>>>>>>> memory-segments that can be overdrawn. >>> >>>>>>>>>>>>>>>> o What is the default value? How to set >>> sensible >>> >>>>>>>> defaults? >>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it using >>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers at >>> >>>>> Task >>> >>>>>>>> level, >>> >>>>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool >>> >>>> can >>> >>>>>>>>>> overdraw up >>> >>>>>>>>>>>>>>>> to 10 memory-segments. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Looking forward to your feedback! >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Thanks, >>> >>>>>>>>>>>>>>>> fanrui >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> [1] >>> >>>>>>>>>>>>>>>> >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints >>> >>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396 >>> >>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762 >>> >>>>>>>>>>>>>>>> [4] >>> >>>>>>>>>>>>>>>> >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >>> >>>>>>>>>>>>>>>> [5] >>> >>>>>>>>>>>>>>>> >>> >> >>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe >>> >>>>>>>>>>>>>>>> [6] https://github.com/apache/flink-benchmarks/pull/54 >>> >>>>>>>>>>>> -- >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Best regards, >>> >>>>>>>>>>>> Anton Kalashnikov >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>> -- >>> >>>>>>>>>> >>> >>>>>>>>>> Best regards, >>> >>>>>>>>>> Anton Kalashnikov >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >> -- >>> >> >>> >> Best regards, >>> >> Anton Kalashnikov >>> >> >>> >> >>> >>