Hi Anton, Piotrek and Dawid, Thanks for your help.
I created FLINK-27522[1] as the first task. And I will finish it asap. @Piotrek, for the default value, do you think it should be less than 5? What do you think about 3? Actually, I think 5 isn't big. It's 1 or 3 or 5 that doesn't matter much, the focus is on reasonably resolving deadlock problems. Or I push the second task to move forward first and we discuss the default value in PR. For the legacySource, I got your idea. And I propose we create the third task to handle it. Because it is independent and for compatibility with the old API. What do you think? I updated the third task on FLIP-227[2]. If all is ok, I will create a JIRA for the third Task and add it to FLIP-227. And I will develop them from the first task to the third task. Thanks again for your help. [1] https://issues.apache.org/jira/browse/FLINK-27522 [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer Thanks fanrui On Fri, May 6, 2022 at 3:50 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi fanrui, > > > How to identify legacySource? > > legacy sources are always using the SourceStreamTask class and > SourceStreamTask is used only for legacy sources. But I'm not sure how to > enable/disable that. Adding `disableOverdraft()` call in SourceStreamTask > would be better compared to relying on the `getAvailableFuture()` call > (isn't it used for back pressure metric anyway?). Ideally we should > enable/disable it in the constructors, but that might be tricky. > > > I prefer it to be between 5 and 10 > > I would vote for a smaller value because of FLINK-13203 > > Piotrek > > > > czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a): > >> Hi, >> >> Thanks a lot for your discussion. >> >> After several discussions, I think it's clear now. I updated the >> "Proposed Changes" of FLIP-227[1]. If I have something >> missing, please help to add it to FLIP, or add it in the mail >> and I can add it to FLIP. If everything is OK, I will create a >> new JIRA for the first task, and use FLINK-26762[2] as the >> second task. >> >> About the legacy source, do we set maxOverdraftBuffersPerGate=0 >> directly? How to identify legacySource? Or could we add >> the overdraftEnabled in LocalBufferPool? The default value >> is false. If the getAvailableFuture is called, change >> overdraftEnabled=true. >> It indicates whether there are checks isAvailable elsewhere. >> It might be more general, it can cover more cases. >> >> Also, I think the default value of 'max-overdraft-buffers-per-gate' >> needs to be confirmed. I prefer it to be between 5 and 10. How >> do you think? >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >> [2] https://issues.apache.org/jira/browse/FLINK-26762 >> >> Thanks >> fanrui >> >> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >>> Hi again, >>> >>> After sleeping over this, if both versions (reserve and overdraft) have >>> the same complexity, I would also prefer the overdraft. >>> >>> > `Integer.MAX_VALUE` as default value was my idea as well but now, as >>> > Dawid mentioned, I think it is dangerous since it is too implicit for >>> > the user and if the user submits one more job for the same TaskManger >>> >>> As I mentioned, it's not only an issue with multiple jobs. The same >>> problem can happen with different subtasks from the same job, potentially >>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I would be >>> in favour of Integer.MAX_VALUE to be the default value, but as it is, I >>> think we should indeed play on the safe side and limit it. >>> >>> > I still don't understand how should be limited "reserve" >>> implementation. >>> > I mean if we have X buffers in total and the user sets overdraft equal >>> > to X we obviously can not reserve all buffers, but how many we are >>> > allowed to reserve? Should it be a different configuration like >>> > percentegeForReservedBuffers? >>> >>> The reserve could be defined as percentage, or as a fixed number of >>> buffers. But yes. In normal operation subtask would not use the reserve, as >>> if numberOfAvailableBuffers < reserve, the output would be not available. >>> Only in the flatMap/timers/huge records case the reserve could be used. >>> >>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers, >>> will LocalBufferPool never be available? Can't process data? >>> >>> Of course we would need to make sure that never happens. So the reserve >>> should be < total buffer size. >>> >>> > 2. If the overdraft buffer use the extra buffers, when the downstream >>> > task inputBuffer is insufficient, it should fail to start the job, and >>> then >>> > restart? When the InputBuffer is initialized, it will apply for enough >>> > buffers, right? >>> >>> The failover if downstream can not allocate buffers is already >>> implemented FLINK-14872 [2]. There is a timeout for how long the task is >>> waiting for buffer allocation. However this doesn't prevent many >>> (potentially infinitely many) deadlock/restarts cycles. IMO the propper >>> solution for [1] would be 2b described in the ticket: >>> >>> > 2b. Assign extra buffers only once all of the tasks are RUNNING. This >>> is a simplified version of 2a, without tracking the tasks sink-to-source. >>> >>> But that's a pre-existing problem and I don't think we have to solve it >>> before implementing overdraft. I think we would need to solve it only >>> before setting Integer.MAX_VALUE as the default for the overdraft. Maybe I >>> would hesitate setting the overdraft to anything more then a couple of >>> buffers by default for the same reason. >>> >>> > Actually, I totally agree that we don't need a lot of buffers for >>> overdraft >>> >>> and >>> >>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. >>> > When we finish this feature and after users use it, if users feedback >>> > this issue we can discuss again. >>> >>> +1 >>> >>> Piotrek >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-13203 >>> [2] https://issues.apache.org/jira/browse/FLINK-14872 >>> >>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a): >>> >>>> Hi everyone, >>>> >>>> I still have some questions. >>>> >>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers, will >>>> LocalBufferPool never be available? Can't process data? >>>> 2. If the overdraft buffer use the extra buffers, when the downstream >>>> task inputBuffer is insufficient, it should fail to start the job, and >>>> then >>>> restart? When the InputBuffer is initialized, it will apply for enough >>>> buffers, right? >>>> >>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. >>>> When we finish this feature and after users use it, if users feedback >>>> this issue we can discuss again. >>>> >>>> Thanks >>>> fanrui >>>> >>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <dwysakow...@apache.org> >>>> wrote: >>>> >>>>> Hey all, >>>>> >>>>> I have not replied in the thread yet, but I was following the >>>>> discussion. >>>>> >>>>> Personally, I like Fanrui's and Anton's idea. As far as I understand >>>>> it >>>>> the idea to distinguish between inside flatMap & outside would be >>>>> fairly >>>>> simple, but maybe slightly indirect. The checkAvailability would >>>>> remain >>>>> unchanged and it is checked always between separate invocations of the >>>>> UDF. Therefore the overdraft buffers would not apply there. However >>>>> once >>>>> the pool says it is available, it means it has at least an initial >>>>> buffer. So any additional request without checking for availability >>>>> can >>>>> be considered to be inside of processing a single record. This does >>>>> not >>>>> hold just for the LegacySource as I don't think it actually checks for >>>>> the availability of buffers in the LocalBufferPool. >>>>> >>>>> In the offline chat with Anton, we also discussed if we need a limit >>>>> of >>>>> the number of buffers we could overdraft (or in other words if the >>>>> limit >>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to >>>>> stay >>>>> on the safe side and have it limited. The pool of network buffers is >>>>> shared for the entire TaskManager, so it means it can be shared even >>>>> across tasks of separate jobs. However, I might be just unnecessarily >>>>> cautious here. >>>>> >>>>> Best, >>>>> >>>>> Dawid >>>>> >>>>> On 04/05/2022 10:54, Piotr Nowojski wrote: >>>>> > Hi, >>>>> > >>>>> > Thanks for the answers. >>>>> > >>>>> >> we may still need to discuss whether the >>>>> >> overdraft/reserve/spare should use extra buffers or buffers >>>>> >> in (exclusive + floating buffers)? >>>>> > and >>>>> > >>>>> >> These things resolve the different problems (at least as I see >>>>> that). >>>>> >> The current hardcoded "1" says that we switch "availability" to >>>>> >> "unavailability" when one more buffer is left(actually a little less >>>>> >> than one buffer since we write the last piece of data to this last >>>>> >> buffer). The overdraft feature doesn't change this logic we still >>>>> want >>>>> >> to switch to "unavailability" in such a way but if we are already in >>>>> >> "unavailability" and we want more buffers then we can take >>>>> "overdraft >>>>> >> number" more. So we can not avoid this hardcoded "1" since we need >>>>> to >>>>> >> understand when we should switch to "unavailability" >>>>> > Ok, I see. So it seems to me that both of you have in mind to keep >>>>> the >>>>> > buffer pools as they are right now, but if we are in the middle of >>>>> > processing a record, we can request extra overdraft buffers on top of >>>>> > those? This is another way to implement the overdraft to what I was >>>>> > thinking. I was thinking about something like keeping the >>>>> "overdraft" or >>>>> > more precisely buffer "reserve" in the buffer pool. I think my >>>>> version >>>>> > would be easier to implement, because it is just fiddling with >>>>> min/max >>>>> > buffers calculation and slightly modified `checkAvailability()` >>>>> logic. >>>>> > >>>>> > On the other hand what you have in mind would better utilise the >>>>> available >>>>> > memory, right? It would require more code changes (how would we know >>>>> when >>>>> > we are allowed to request the overdraft?). However, in this case, I >>>>> would >>>>> > be tempted to set the number of overdraft buffers by default to >>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers as >>>>> > necessary. The only downside that I can think of (apart of higher >>>>> > complexity) would be higher chance of hitting a known/unsolved >>>>> deadlock [1] >>>>> > in a scenario: >>>>> > - downstream task hasn't yet started >>>>> > - upstream task requests overdraft and uses all available memory >>>>> segments >>>>> > from the global pool >>>>> > - upstream task is blocked, because downstream task hasn't started >>>>> yet and >>>>> > can not consume any data >>>>> > - downstream task tries to start, but can not, as there are no >>>>> available >>>>> > buffers >>>>> > >>>>> >> BTW, for watermark, the number of buffers it needs is >>>>> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, >>>>> >> the watermark won't block in requestMemory. >>>>> > and >>>>> > >>>>> >> the best overdraft size will be equal to parallelism. >>>>> > That's a lot of buffers. I don't think we need that many for >>>>> broadcasting >>>>> > watermarks. Watermarks are small, and remember that every >>>>> subpartition has >>>>> > some partially filled/empty WIP buffer, so the vast majority of >>>>> > subpartitions will not need to request a new buffer. >>>>> > >>>>> > Best, >>>>> > Piotrek >>>>> > >>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203 >>>>> > >>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> >>>>> napisał(a): >>>>> > >>>>> >> Hi, >>>>> >> >>>>> >> >>>>> >> >> Do you mean to ignore it while processing records, but keep >>>>> using >>>>> >> `maxBuffersPerChannel` when calculating the availability of the >>>>> output? >>>>> >> >>>>> >> >>>>> >> Yes, it is correct. >>>>> >> >>>>> >> >>>>> >> >> Would it be a big issue if we changed it to check if at least >>>>> >> "overdraft number of buffers are available", where "overdraft >>>>> number" is >>>>> >> configurable, instead of the currently hardcoded value of "1"? >>>>> >> >>>>> >> >>>>> >> These things resolve the different problems (at least as I see >>>>> that). >>>>> >> The current hardcoded "1" says that we switch "availability" to >>>>> >> "unavailability" when one more buffer is left(actually a little less >>>>> >> than one buffer since we write the last piece of data to this last >>>>> >> buffer). The overdraft feature doesn't change this logic we still >>>>> want >>>>> >> to switch to "unavailability" in such a way but if we are already in >>>>> >> "unavailability" and we want more buffers then we can take >>>>> "overdraft >>>>> >> number" more. So we can not avoid this hardcoded "1" since we need >>>>> to >>>>> >> understand when we should switch to "unavailability" >>>>> >> >>>>> >> >>>>> >> -- About "reserve" vs "overdraft" >>>>> >> >>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will be >>>>> >> equal to parallelism. Also, the user can set any value he wants. So >>>>> even >>>>> >> if parallelism is small(~5) but the user's flatmap produces a lot of >>>>> >> data, the user can set 10 or even more. Which almost double the max >>>>> >> buffers and it will be impossible to reserve. At least we need to >>>>> figure >>>>> >> out how to protect from such cases (the limit for an overdraft?). So >>>>> >> actually it looks even more difficult than increasing the maximum >>>>> buffers. >>>>> >> >>>>> >> I want to emphasize that overdraft buffers are soft configuration >>>>> which >>>>> >> means it takes as many buffers as the global buffers pool has >>>>> >> available(maybe zero) but less than this configured value. It is >>>>> also >>>>> >> important to notice that perhaps, not many subtasks in TaskManager >>>>> will >>>>> >> be using this feature so we don't actually need a lot of available >>>>> >> buffers for every subtask(Here, I mean that if we have only one >>>>> >> window/flatmap operator and many other operators, then one >>>>> TaskManager >>>>> >> will have many ordinary subtasks which don't actually need >>>>> overdraft and >>>>> >> several subtasks that needs this feature). But in case of >>>>> reservation, >>>>> >> we will reserve some buffers for all operators even if they don't >>>>> really >>>>> >> need it. >>>>> >> >>>>> >> >>>>> >> -- Legacy source problem >>>>> >> >>>>> >> If we still want to change max buffers then it is problem for >>>>> >> LegacySources(since every subtask of source will always use these >>>>> >> overdraft). But right now, I think that we can force to set 0 >>>>> overdraft >>>>> >> buffers for legacy subtasks in configuration during execution(if it >>>>> is >>>>> >> not too late for changing configuration in this place). >>>>> >> >>>>> >> >>>>> >> 03.05.2022 14:11, rui fan пишет: >>>>> >>> Hi >>>>> >>> >>>>> >>> Thanks for Martijn Visser and Piotrek's feedback. I agree with >>>>> >>> ignoring the legacy source, it will affect our design. User should >>>>> >>> use the new Source Api as much as possible. >>>>> >>> >>>>> >>> Hi Piotrek, we may still need to discuss whether the >>>>> >>> overdraft/reserve/spare should use extra buffers or buffers >>>>> >>> in (exclusive + floating buffers)? They have some differences. >>>>> >>> >>>>> >>> If it uses extra buffers: >>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1 >>>>> >>> <= currentPoolSize) and all subpartitions don't reach the >>>>> >>> maxBuffersPerChannel. >>>>> >>> >>>>> >>> If it uses the buffers in (exclusive + floating buffers): >>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers + >>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions >>>>> >>> don't reach the maxBuffersPerChannel. >>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), the >>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be >>>>> >>> easily unavailable. For throughput, if users turn up the >>>>> >>> overdraft buffers, they need to turn up exclusive or floating >>>>> >>> buffers. It also affects the InputChannel, and it's is unfriendly >>>>> >>> to users. >>>>> >>> >>>>> >>> So I prefer the overdraft to use extra buffers. >>>>> >>> >>>>> >>> >>>>> >>> BTW, for watermark, the number of buffers it needs is >>>>> >>> numberOfSubpartitions. So if >>>>> overdraftBuffers=numberOfSubpartitions, >>>>> >>> the watermark won't block in requestMemory. But it has >>>>> >>> 2 problems: >>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses >>>>> >>> (exclusive + floating buffers), there will be fewer buffers >>>>> >>> available. Throughput may be affected. >>>>> >>> 2. The numberOfSubpartitions is different for each Task. >>>>> >>> So if users want to cover watermark using this feature, >>>>> >>> they don't know how to set the overdraftBuffers more r >>>>> >>> easonably. And if the parallelism is changed, users still >>>>> >>> need to change overdraftBuffers. It is unfriendly to users. >>>>> >>> >>>>> >>> So I propose we support overdraftBuffers=-1, It means >>>>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions >>>>> >>> in the Constructor of LocalBufferPool. >>>>> >>> >>>>> >>> Please correct me if I'm wrong. >>>>> >>> >>>>> >>> Thanks >>>>> >>> fanrui >>>>> >>> >>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski < >>>>> pnowoj...@apache.org> >>>>> >> wrote: >>>>> >>>> Hi fanrui, >>>>> >>>> >>>>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive >>>>> >> buffers * >>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be >>>>> available >>>>> >>>> when >>>>> >>>>> (usedBuffers+overdraftBuffers <= >>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers) >>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, >>>>> right? >>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum >>>>> number of >>>>> >> the >>>>> >>>> required buffers, just as we did when we were implementing the non >>>>> >> blocking >>>>> >>>> outputs and adding availability logic to LocalBufferPool. Back >>>>> then we >>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this >>>>> logic is >>>>> >>>> located >>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition: >>>>> >>>> >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : >>>>> numSubpartitions + 1; >>>>> >>>> For performance reasons, we always require at least one buffer per >>>>> >>>> sub-partition. Otherwise performance falls drastically. Now if we >>>>> >> require 5 >>>>> >>>> overdraft buffers for output to be available, we need to have >>>>> them on >>>>> >> top >>>>> >>>> of those "one buffer per sub-partition". So the logic should be >>>>> changed >>>>> >> to: >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : >>>>> numSubpartitions + >>>>> >>>> numOverdraftBuffers; >>>>> >>>> >>>>> >>>> Regarding increasing the number of max buffers I'm not sure. As >>>>> long as >>>>> >>>> "overdraft << max number of buffers", because all buffers on the >>>>> outputs >>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft >>>>> buffers, >>>>> >> and >>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of >>>>> things if >>>>> >> we >>>>> >>>> make the output available if at least one single buffer is >>>>> available or >>>>> >> at >>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So >>>>> effects of >>>>> >>>> increasing the overdraft from 1 to for example 5 should be >>>>> negligible. >>>>> >> For >>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 still >>>>> >> increases >>>>> >>>> the overdraft by only about 25%. So maybe we can keep the max as >>>>> it is? >>>>> >>>> >>>>> >>>> If so, maybe we should change the name from "overdraft" to "buffer >>>>> >> reserve" >>>>> >>>> or "spare buffers"? And document it as "number of buffers kept in >>>>> >> reserve >>>>> >>>> in case of flatMap/firing timers/huge records"? >>>>> >>>> >>>>> >>>> What do you think Fenrui, Anton? >>>>> >>>> >>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the new >>>>> >> features, >>>>> >>>> as long as we don't brake the existing deployments too much. >>>>> >>>> >>>>> >>>> Best, >>>>> >>>> Piotrek >>>>> >>>> >>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com> >>>>> >> napisał(a): >>>>> >>>>> Hi everyone, >>>>> >>>>> >>>>> >>>>> Just wanted to chip in on the discussion of legacy sources: >>>>> IMHO, we >>>>> >>>> should >>>>> >>>>> not focus too much on improving/adding capabilities for legacy >>>>> sources. >>>>> >>>> We >>>>> >>>>> want to persuade and push users to use the new Source API. Yes, >>>>> this >>>>> >>>> means >>>>> >>>>> that there's work required by the end users to port any custom >>>>> source >>>>> >> to >>>>> >>>>> the new interface. The benefits of the new Source API should >>>>> outweigh >>>>> >>>> this. >>>>> >>>>> Anything that we build to support multiple interfaces means >>>>> adding more >>>>> >>>>> complexity and more possibilities for bugs. Let's try to make our >>>>> >> lives a >>>>> >>>>> little bit easier. >>>>> >>>>> >>>>> >>>>> Best regards, >>>>> >>>>> >>>>> >>>>> Martijn Visser >>>>> >>>>> https://twitter.com/MartijnVisser82 >>>>> >>>>> https://github.com/MartijnVisser >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> >>>>> wrote: >>>>> >>>>> >>>>> >>>>>> Hi Piotrek >>>>> >>>>>> >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep >>>>> using >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the >>>>> >>>> output? >>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double check. >>>>> >>>>>> >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a >>>>> big >>>>> >>>>> problem >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we already >>>>> have >>>>> >>>>>>> effectively hardcoded a single overdraft buffer. >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single >>>>> >>>> buffer >>>>> >>>>>>> available and this works the same for all tasks (including >>>>> legacy >>>>> >>>>> source >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if >>>>> at least >>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft >>>>> number" >>>>> >>>>> is >>>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"? >>>>> >>>>>> Do you mean don't add the extra buffers? We just use (exclusive >>>>> >>>> buffers * >>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be >>>>> available >>>>> >>>>> when >>>>> >>>>>> (usedBuffers+overdraftBuffers <= >>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers) >>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, >>>>> right? >>>>> >>>>>> >>>>> >>>>>> If yes, I think it can solve the problem of legacy source. >>>>> There may >>>>> >> be >>>>> >>>>>> some impact. If overdraftBuffers is large and only one buffer >>>>> is used >>>>> >>>> to >>>>> >>>>>> process a single record, exclusive buffers*parallelism + >>>>> floating >>>>> >>>> buffers >>>>> >>>>>> cannot be used. It may only be possible to use (exclusive >>>>> buffers * >>>>> >>>>>> parallelism >>>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, if >>>>> turn >>>>> >> up >>>>> >>>>> the >>>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive or >>>>> >>>> floating >>>>> >>>>>> buffers. And it also affects the InputChannel. >>>>> >>>>>> >>>>> >>>>>> If not, I don't think it can solve the problem of legacy >>>>> source. The >>>>> >>>>> legacy >>>>> >>>>>> source don't check isAvailable, If there are the extra buffers, >>>>> legacy >>>>> >>>>>> source >>>>> >>>>>> will use them up until block in requestMemory. >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> Thanks >>>>> >>>>>> fanrui >>>>> >>>>>> >>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski < >>>>> pnowoj...@apache.org> >>>>> >>>>>> wrote: >>>>> >>>>>> >>>>> >>>>>>> Hi, >>>>> >>>>>>> >>>>> >>>>>>> +1 for the general proposal from my side. It would be a nice >>>>> >>>> workaround >>>>> >>>>>>> flatMaps, WindowOperators and large records issues with >>>>> unaligned >>>>> >>>>>>> checkpoints. >>>>> >>>>>>> >>>>> >>>>>>>> The first task is about ignoring max buffers per channel. This >>>>> >>>> means >>>>> >>>>> if >>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the >>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just >>>>> ignore >>>>> >>>> that >>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has >>>>> it(it is >>>>> >>>>>>>> actually not a overdraft). >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep >>>>> using >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the >>>>> >>>> output? >>>>> >>>>>>>> The second task is about the real overdraft. I am pretty >>>>> convinced >>>>> >>>>> now >>>>> >>>>>>>> that we, unfortunately, need configuration for limitation of >>>>> >>>>> overdraft >>>>> >>>>>>>> number(because it is not ok if one subtask allocates all >>>>> buffers of >>>>> >>>>> one >>>>> >>>>>>>> TaskManager considering that several different jobs can be >>>>> >>>> submitted >>>>> >>>>> on >>>>> >>>>>>>> this TaskManager). So idea is to have >>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >>>>> >>>>>> LocalBufferPool). >>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is >>>>> >>>> reached, >>>>> >>>>>>>> LocalBufferPool can request additionally from >>>>> NetworkBufferPool up >>>>> >>>> to >>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers. >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a >>>>> big >>>>> >>>>> problem >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we already >>>>> have >>>>> >>>>>>> effectively hardcoded a single overdraft buffer. >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single >>>>> >>>> buffer >>>>> >>>>>>> available and this works the same for all tasks (including >>>>> legacy >>>>> >>>>> source >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if >>>>> at least >>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft >>>>> number" >>>>> >>>>> is >>>>> >>>>>>> configurable, instead of the currently hardcoded value of "1"? >>>>> >>>>>>> >>>>> >>>>>>> Best, >>>>> >>>>>>> Piotrek >>>>> >>>>>>> >>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> >>>>> napisał(a): >>>>> >>>>>>> >>>>> >>>>>>>> Let me add some information about the LegacySource. >>>>> >>>>>>>> >>>>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource. >>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool? >>>>> >>>>>>>> The default value is false. If the getAvailableFuture is >>>>> called, >>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there are >>>>> >>>>>>>> checks isAvailable elsewhere. >>>>> >>>>>>>> >>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct me >>>>> if >>>>> >>>> I'm >>>>> >>>>>>> wrong. >>>>> >>>>>>>> Thanks >>>>> >>>>>>>> fanrui >>>>> >>>>>>>> >>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan < >>>>> 1996fan...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>>>>>> Hi, >>>>> >>>>>>>>> >>>>> >>>>>>>>> Thanks for your quick response. >>>>> >>>>>>>>> >>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need to >>>>> >>>>> discuss >>>>> >>>>>>> the >>>>> >>>>>>>>> default value in PR. >>>>> >>>>>>>>> >>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for >>>>> general >>>>> >>>>>>>>> implementation. >>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in >>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common >>>>> >>>>> LegacySource, >>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume >>>>> >>>> kafka, >>>>> >>>>> so >>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems. >>>>> >>>>>>>>> >>>>> >>>>>>>>> Core code: >>>>> >>>>>>>>> ``` >>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() { >>>>> >>>>>>>>> if (recordWriter == null >>>>> >>>>>>>>> || >>>>> >>>>>>>>> >>>>> >> >>>>> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, >>>>> >>>>>>>>> false) >>>>> >>>>>>>>> || recordWriter.isAvailable()) { >>>>> >>>>>>>>> return; >>>>> >>>>>>>>> } >>>>> >>>>>>>>> >>>>> >>>>>>>>> CompletableFuture<?> resumeFuture = >>>>> >>>>>>>> recordWriter.getAvailableFuture(); >>>>> >>>>>>>>> try { >>>>> >>>>>>>>> resumeFuture.get(); >>>>> >>>>>>>>> } catch (Throwable ignored) { >>>>> >>>>>>>>> } >>>>> >>>>>>>>> } >>>>> >>>>>>>>> ``` >>>>> >>>>>>>>> >>>>> >>>>>>>>> LegacySource calls >>>>> sourceContext.ensureRecordWriterIsAvailable() >>>>> >>>>>>>>> before synchronized (checkpointLock) and collects records. >>>>> >>>>>>>>> Please let me know if there is a better solution. >>>>> >>>>>>>>> >>>>> >>>>>>>>> Thanks >>>>> >>>>>>>>> fanrui >>>>> >>>>>>>>> >>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov < >>>>> >>>>>> kaa....@yandex.com> >>>>> >>>>>>>>> wrote: >>>>> >>>>>>>>> >>>>> >>>>>>>>>> Hi. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or >>>>> two >>>>> >>>>>> commits >>>>> >>>>>>>> in a >>>>> >>>>>>>>>> PR? >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this >>>>> task has >>>>> >>>>>> fewer >>>>> >>>>>>>>>> questions but we should find a solution for LegacySource >>>>> first. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> -- 2. For the first task, if the flink user disables the >>>>> >>>>> Unaligned >>>>> >>>>>>>>>> Checkpoint, do we ignore max buffers per channel? >>>>> Because >>>>> >>>> the >>>>> >>>>>>>>>> overdraft >>>>> >>>>>>>>>> isn't useful for the Aligned Checkpoint, it still >>>>> needs to >>>>> >>>>> wait >>>>> >>>>>>> for >>>>> >>>>>>>>>> downstream Task to consume. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> I think that the logic should be the same for AC and UC. As >>>>> I >>>>> >>>>>>>> understand, >>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it >>>>> doesn't >>>>> >>>>> make >>>>> >>>>>>> it >>>>> >>>>>>>>>> worse as well. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 3. For the second task >>>>> >>>>>>>>>> -- - The default value of >>>>> maxOverdraftBuffersPerPartition >>>>> >>>> may >>>>> >>>>>>> also >>>>> >>>>>>>>>> need >>>>> >>>>>>>>>> to be discussed. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> I think it should be a pretty small value or even 0 since it >>>>> >>>> kind >>>>> >>>>> of >>>>> >>>>>>>>>> optimization and user should understand what they >>>>> do(especially >>>>> >>>> if >>>>> >>>>>> we >>>>> >>>>>>>>>> implement the first task). >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> -- - If the user disables the Unaligned Checkpoint, >>>>> can we >>>>> >>>>> set >>>>> >>>>>>> the >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >>>>> overdraft >>>>> >>>>>> isn't >>>>> >>>>>>>>>> useful for >>>>> >>>>>>>>>> the Aligned Checkpoint. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make >>>>> >>>>>> degradation >>>>> >>>>>>>> for >>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make >>>>> >>>>> difference >>>>> >>>>>>>> between >>>>> >>>>>>>>>> AC and UC. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 4. For the legacy source >>>>> >>>>>>>>>> -- - If enabling the Unaligned Checkpoint, it uses up >>>>> to >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>>>> >>>>>>>>>> - If disabling the UC, it doesn't use the overdraft >>>>> >>>> buffer. >>>>> >>>>>>>>>> - Do you think it's ok? >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at >>>>> all >>>>> >>>>> since >>>>> >>>>>>> it >>>>> >>>>>>>>>> can lead to undesirable results especially if the limit is >>>>> high. >>>>> >>>>> At >>>>> >>>>>>>> least, >>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode and >>>>> it >>>>> >>>> will >>>>> >>>>>>>> borrow >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool >>>>> >>>> which >>>>> >>>>>> can >>>>> >>>>>>>> lead >>>>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> -- - Actually, we added the checkAvailable logic for >>>>> >>>>>> LegacySource >>>>> >>>>>>>> in >>>>> >>>>>>>>>> our >>>>> >>>>>>>>>> internal version. It works well. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> I don't really understand how it is possible for general >>>>> case >>>>> >>>>>>>> considering >>>>> >>>>>>>>>> that each user has their own implementation of >>>>> >>>>> LegacySourceOperator >>>>> >>>>>>>>>> -- 5. For the benchmark, do you have any suggestions? I >>>>> >>>>> submitted >>>>> >>>>>>> the >>>>> >>>>>>>> PR >>>>> >>>>>>>>>> [1]. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет: >>>>> >>>>>>>>>>> Hi, >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> 1. Do you mean split this into two JIRAs or two PRs >>>>> or two >>>>> >>>>>>> commits >>>>> >>>>>>>>>> in a >>>>> >>>>>>>>>>> PR? >>>>> >>>>>>>>>>> 2. For the first task, if the flink user disables the >>>>> >>>>>> Unaligned >>>>> >>>>>>>>>>> Checkpoint, do we ignore max buffers per channel? >>>>> Because >>>>> >>>>> the >>>>> >>>>>>>>>> overdraft >>>>> >>>>>>>>>>> isn't useful for the Aligned Checkpoint, it still >>>>> needs to >>>>> >>>>>> wait >>>>> >>>>>>>> for >>>>> >>>>>>>>>>> downstream Task to consume. >>>>> >>>>>>>>>>> 3. For the second task >>>>> >>>>>>>>>>> - The default value of >>>>> maxOverdraftBuffersPerPartition >>>>> >>>>> may >>>>> >>>>>>> also >>>>> >>>>>>>>>> need >>>>> >>>>>>>>>>> to be discussed. >>>>> >>>>>>>>>>> - If the user disables the Unaligned Checkpoint, >>>>> can we >>>>> >>>>> set >>>>> >>>>>>> the >>>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >>>>> >>>> overdraft >>>>> >>>>>>> isn't >>>>> >>>>>>>>>> useful for >>>>> >>>>>>>>>>> the Aligned Checkpoint. >>>>> >>>>>>>>>>> 4. For the legacy source >>>>> >>>>>>>>>>> - If enabling the Unaligned Checkpoint, it uses >>>>> up to >>>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>>>> >>>>>>>>>>> - If disabling the UC, it doesn't use the >>>>> overdraft >>>>> >>>>> buffer. >>>>> >>>>>>>>>>> - Do you think it's ok? >>>>> >>>>>>>>>>> - Actually, we added the checkAvailable logic for >>>>> >>>>>>> LegacySource >>>>> >>>>>>>>>> in our >>>>> >>>>>>>>>>> internal version. It works well. >>>>> >>>>>>>>>>> 5. For the benchmark, do you have any suggestions? I >>>>> >>>>> submitted >>>>> >>>>>>> the >>>>> >>>>>>>>>> PR >>>>> >>>>>>>>>>> [1]. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54 >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Thanks >>>>> >>>>>>>>>>> fanrui >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov < >>>>> >>>>>>> kaa....@yandex.com >>>>> >>>>>>>>>>> wrote: >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>>> Hi, >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here >>>>> is >>>>> >>>>> some >>>>> >>>>>>>>>>>> conclusion: >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> First of all, let's split this into two tasks. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per channel. >>>>> >>>> This >>>>> >>>>>>> means >>>>> >>>>>>>> if >>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the >>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just >>>>> >>>>> ignore >>>>> >>>>>>> that >>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool has >>>>> >>>> it(it >>>>> >>>>>> is >>>>> >>>>>>>>>>>> actually not a overdraft). >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty >>>>> >>>>>> convinced >>>>> >>>>>>>> now >>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for limitation >>>>> of >>>>> >>>>>>>> overdraft >>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all >>>>> >>>>> buffers >>>>> >>>>>> of >>>>> >>>>>>>> one >>>>> >>>>>>>>>>>> TaskManager considering that several different jobs can be >>>>> >>>>>>> submitted >>>>> >>>>>>>> on >>>>> >>>>>>>>>>>> this TaskManager). So idea is to have >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >>>>> >>>>>>>>>> LocalBufferPool). >>>>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool >>>>> is >>>>> >>>>>>> reached, >>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from >>>>> >>>> NetworkBufferPool >>>>> >>>>>> up >>>>> >>>>>>> to >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource >>>>> since it >>>>> >>>>>>>> actually >>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in >>>>> >>>> overdraft >>>>> >>>>>>> mode >>>>> >>>>>>>>>>>> which is not a target. So we still need to think about >>>>> that. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет: >>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov, >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum >>>>> number of >>>>> >>>>>>>> overdraft >>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, right? >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to >>>>> don't >>>>> >>>> add >>>>> >>>>>> the >>>>> >>>>>>>> new >>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the >>>>> community. >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> Best wishes >>>>> >>>>>>>>>>>>> fanrui >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan < >>>>> >>>>> 1996fan...@gmail.com> >>>>> >>>>>>>>>> wrote: >>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov, >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are >>>>> totally >>>>> >>>>>> right. >>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used >>>>> as >>>>> >>>>> the >>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer >>>>> >>>>>>> configuration.Flink >>>>> >>>>>>>>>> users >>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the >>>>> overdraft >>>>> >>>>>> buffer >>>>> >>>>>>>>>> size. >>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we >>>>> >>>> should >>>>> >>>>>>> limit >>>>> >>>>>>>>>> the >>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each >>>>> >>>>> LocalBufferPool >>>>> >>>>>>>>>>>>>> can apply for. >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> Why do we limit it? >>>>> >>>>>>>>>>>>>> Some operators don't check the >>>>> `recordWriter.isAvailable` >>>>> >>>>>> during >>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have >>>>> mentioned >>>>> >>>> it >>>>> >>>>>> in >>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases. >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use >>>>> up >>>>> >>>> all >>>>> >>>>>>>>>> remaining >>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is >>>>> >>>>>> severe. >>>>> >>>>>>>>>>>>>> How to limit it? >>>>> >>>>>>>>>>>>>> I prefer to hard code the >>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions` >>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The >>>>> >>>>> maxOverdraftBuffers >>>>> >>>>>> is >>>>> >>>>>>>>>> just >>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink >>>>> jobs. Or >>>>> >>>>> we >>>>> >>>>>>> can >>>>> >>>>>>>>>> set >>>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, >>>>> 10)` >>>>> >>>> to >>>>> >>>>>>> handle >>>>> >>>>>>>>>>>>>> some jobs of low parallelism. >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we >>>>> can >>>>> >>>>> set >>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of >>>>> >>>> LocalBufferPool. >>>>> >>>>>>>> Because >>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint. >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot. >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759 >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> Best wishes >>>>> >>>>>>>>>>>>>> fanrui >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov < >>>>> >>>>>>>>>> kaa....@yandex.com> >>>>> >>>>>>>>>>>>>> wrote: >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Hi fanrui, >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it >>>>> >>>> should >>>>> >>>>>>> help >>>>> >>>>>>>> in >>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about >>>>> >>>>>> configuration: >>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand >>>>> right >>>>> >>>>> now >>>>> >>>>>>> we >>>>> >>>>>>>>>> have >>>>> >>>>>>>>>>>>>>> following calculation. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network >>>>> >>>>> memory(calculated >>>>> >>>>>>> via >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction, >>>>> >>>>>>>> taskmanager.memory.network.min, >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) / >>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive >>>>> >>>> buffers >>>>> >>>>> * >>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in >>>>> >>>>>> TaskManager >>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used >>>>> at >>>>> >>>>>>> current >>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber) >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to >>>>> >>>>>>> maxBuffersNumber >>>>> >>>>>>>>>> which >>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if >>>>> >>>> requiredBuffersNumber >>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not >>>>> good) >>>>> >>>>> since >>>>> >>>>>>> not >>>>> >>>>>>>>>> all >>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if >>>>> Flink >>>>> >>>> can >>>>> >>>>>> not >>>>> >>>>>>>>>>>>>>> allocate floating buffers) >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I >>>>> >>>>>> understand >>>>> >>>>>>>>>> Flink >>>>> >>>>>>>>>>>>>>> just never use these leftovers >>>>> buffers(maxBuffersNumber - >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we >>>>> can >>>>> >>>>>> actualy >>>>> >>>>>>>> use >>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber - >>>>> >>>> buffersInUseNumber' >>>>> >>>>>>> since >>>>> >>>>>>>>>> if >>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators including >>>>> >>>>> 'window' >>>>> >>>>>>>> which >>>>> >>>>>>>>>> can >>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool). >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to >>>>> >>>> requesting >>>>> >>>>>>>> buffers >>>>> >>>>>>>>>>>>>>> during processing single record while switching to >>>>> >>>>>> unavalability >>>>> >>>>>>>>>>>> between >>>>> >>>>>>>>>>>>>>> records should be the same as we have it now): >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel >>>>> >>>>>> reached, >>>>> >>>>>>>>>> then >>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers >>>>> from >>>>> >>>>> any >>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet >>>>> >>>> otherwise >>>>> >>>>>>> from >>>>> >>>>>>>>>>>>>>> NetworkBufferPool) >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally >>>>> >>>> allocate >>>>> >>>>>> it >>>>> >>>>>>>> from >>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't work, >>>>> >>>> but I >>>>> >>>>>>> like >>>>> >>>>>>>> it >>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch without >>>>> >>>> any >>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be >>>>> configuration >>>>> >>>> by >>>>> >>>>>>>>>> changing >>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and >>>>> requiredBuffersNumber. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really want >>>>> to >>>>> >>>>>>>> implement >>>>> >>>>>>>>>> new >>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to >>>>> >>>>> correctly >>>>> >>>>>>>>>> configure >>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I don't >>>>> >>>> want >>>>> >>>>>> to >>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to >>>>> >>>> resolve >>>>> >>>>>> the >>>>> >>>>>>>>>> problem >>>>> >>>>>>>>>>>>>>> automatically(as described above). >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers >>>>> >>>> correct? >>>>> >>>>>>>>>>>>>>> -- >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Best regards, >>>>> >>>>>>>>>>>>>>> Anton Kalashnikov >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет: >>>>> >>>>>>>>>>>>>>>> Hi everyone, >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature >>>>> of >>>>> >>>>>> Flink. >>>>> >>>>>>>> It >>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout >>>>> or >>>>> >>>>> slow >>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe. >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work >>>>> well >>>>> >>>>>> when >>>>> >>>>>>>> the >>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers >>>>> are >>>>> >>>>>>> required >>>>> >>>>>>>> to >>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also >>>>> mentioned >>>>> >>>>> this >>>>> >>>>>>>> issue >>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve >>>>> it. >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the >>>>> >>>>>>> overdraft >>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton >>>>> >>>> Kalashnikov, >>>>> >>>>>>> there >>>>> >>>>>>>>>> are >>>>> >>>>>>>>>>>>>>>> still some points to discuss: >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> * There are already a lot of buffer-related >>>>> >>>>>> configurations. >>>>> >>>>>>>> Do >>>>> >>>>>>>>>> we >>>>> >>>>>>>>>>>>>>>> need to add a new configuration for the >>>>> overdraft >>>>> >>>>>> buffer? >>>>> >>>>>>>>>>>>>>>> * Where should the overdraft buffer use memory? >>>>> >>>>>>>>>>>>>>>> * If the overdraft-buffer uses the memory >>>>> remaining >>>>> >>>> in >>>>> >>>>>> the >>>>> >>>>>>>>>>>>>>>> NetworkBufferPool, no new configuration needs >>>>> to be >>>>> >>>>>>> added. >>>>> >>>>>>>>>>>>>>>> * If adding a new configuration: >>>>> >>>>>>>>>>>>>>>> o Should we set the overdraft-memory-size >>>>> at the >>>>> >>>> TM >>>>> >>>>>>> level >>>>> >>>>>>>>>> or >>>>> >>>>>>>>>>>> the >>>>> >>>>>>>>>>>>>>>> Task level? >>>>> >>>>>>>>>>>>>>>> o Or set overdraft-buffers to indicate the >>>>> number >>>>> >>>>> of >>>>> >>>>>>>>>>>>>>>> memory-segments that can be overdrawn. >>>>> >>>>>>>>>>>>>>>> o What is the default value? How to set >>>>> sensible >>>>> >>>>>>>> defaults? >>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it >>>>> using >>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers >>>>> at >>>>> >>>>> Task >>>>> >>>>>>>> level, >>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool >>>>> >>>> can >>>>> >>>>>>>>>> overdraw up >>>>> >>>>>>>>>>>>>>>> to 10 memory-segments. >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback! >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> Thanks, >>>>> >>>>>>>>>>>>>>>> fanrui >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> [1] >>>>> >>>>>>>>>>>>>>>> >>>>> >> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints >>>>> >>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396 >>>>> >>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762 >>>>> >>>>>>>>>>>>>>>> [4] >>>>> >>>>>>>>>>>>>>>> >>>>> >> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >>>>> >>>>>>>>>>>>>>>> [5] >>>>> >>>>>>>>>>>>>>>> >>>>> >> >>>>> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe >>>>> >>>>>>>>>>>>>>>> [6] >>>>> https://github.com/apache/flink-benchmarks/pull/54 >>>>> >>>>>>>>>>>> -- >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Best regards, >>>>> >>>>>>>>>>>> Anton Kalashnikov >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>> -- >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Best regards, >>>>> >>>>>>>>>> Anton Kalashnikov >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >> -- >>>>> >> >>>>> >> Best regards, >>>>> >> Anton Kalashnikov >>>>> >> >>>>> >> >>>>> >>>>