Hi, I'm not sure. Maybe 5 will be fine? Anton, Dawid, what do you think?
Can you create a parent ticket for the whole FLIP to group all of the issues together? Also FLIP should be officially voted first. Best, Piotrek pt., 6 maj 2022 o 09:08 rui fan <1996fan...@gmail.com> napisał(a): > Hi Anton, Piotrek and Dawid, > > Thanks for your help. > > I created FLINK-27522[1] as the first task. And I will finish it asap. > > @Piotrek, for the default value, do you think it should be less > than 5? What do you think about 3? Actually, I think 5 isn't big. > It's 1 or 3 or 5 that doesn't matter much, the focus is on > reasonably resolving deadlock problems. Or I push the second > task to move forward first and we discuss the default value in PR. > > For the legacySource, I got your idea. And I propose we create > the third task to handle it. Because it is independent and for > compatibility with the old API. What do you think? I updated > the third task on FLIP-227[2]. > > If all is ok, I will create a JIRA for the third Task and add it to > FLIP-227. And I will develop them from the first task to the > third task. > > Thanks again for your help. > > [1] https://issues.apache.org/jira/browse/FLINK-27522 > [2] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > > Thanks > fanrui > > On Fri, May 6, 2022 at 3:50 AM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > Hi fanrui, > > > > > How to identify legacySource? > > > > legacy sources are always using the SourceStreamTask class and > > SourceStreamTask is used only for legacy sources. But I'm not sure how to > > enable/disable that. Adding `disableOverdraft()` call in SourceStreamTask > > would be better compared to relying on the `getAvailableFuture()` call > > (isn't it used for back pressure metric anyway?). Ideally we should > > enable/disable it in the constructors, but that might be tricky. > > > > > I prefer it to be between 5 and 10 > > > > I would vote for a smaller value because of FLINK-13203 > > > > Piotrek > > > > > > > > czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a): > > > >> Hi, > >> > >> Thanks a lot for your discussion. > >> > >> After several discussions, I think it's clear now. I updated the > >> "Proposed Changes" of FLIP-227[1]. If I have something > >> missing, please help to add it to FLIP, or add it in the mail > >> and I can add it to FLIP. If everything is OK, I will create a > >> new JIRA for the first task, and use FLINK-26762[2] as the > >> second task. > >> > >> About the legacy source, do we set maxOverdraftBuffersPerGate=0 > >> directly? How to identify legacySource? Or could we add > >> the overdraftEnabled in LocalBufferPool? The default value > >> is false. If the getAvailableFuture is called, change > >> overdraftEnabled=true. > >> It indicates whether there are checks isAvailable elsewhere. > >> It might be more general, it can cover more cases. > >> > >> Also, I think the default value of 'max-overdraft-buffers-per-gate' > >> needs to be confirmed. I prefer it to be between 5 and 10. How > >> do you think? > >> > >> [1] > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > >> [2] https://issues.apache.org/jira/browse/FLINK-26762 > >> > >> Thanks > >> fanrui > >> > >> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org> > >> wrote: > >> > >>> Hi again, > >>> > >>> After sleeping over this, if both versions (reserve and overdraft) have > >>> the same complexity, I would also prefer the overdraft. > >>> > >>> > `Integer.MAX_VALUE` as default value was my idea as well but now, as > >>> > Dawid mentioned, I think it is dangerous since it is too implicit for > >>> > the user and if the user submits one more job for the same TaskManger > >>> > >>> As I mentioned, it's not only an issue with multiple jobs. The same > >>> problem can happen with different subtasks from the same job, > potentially > >>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I > would be > >>> in favour of Integer.MAX_VALUE to be the default value, but as it is, I > >>> think we should indeed play on the safe side and limit it. > >>> > >>> > I still don't understand how should be limited "reserve" > >>> implementation. > >>> > I mean if we have X buffers in total and the user sets overdraft > equal > >>> > to X we obviously can not reserve all buffers, but how many we are > >>> > allowed to reserve? Should it be a different configuration like > >>> > percentegeForReservedBuffers? > >>> > >>> The reserve could be defined as percentage, or as a fixed number of > >>> buffers. But yes. In normal operation subtask would not use the > reserve, as > >>> if numberOfAvailableBuffers < reserve, the output would be not > available. > >>> Only in the flatMap/timers/huge records case the reserve could be used. > >>> > >>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers, > >>> will LocalBufferPool never be available? Can't process data? > >>> > >>> Of course we would need to make sure that never happens. So the reserve > >>> should be < total buffer size. > >>> > >>> > 2. If the overdraft buffer use the extra buffers, when the downstream > >>> > task inputBuffer is insufficient, it should fail to start the job, > and > >>> then > >>> > restart? When the InputBuffer is initialized, it will apply for > enough > >>> > buffers, right? > >>> > >>> The failover if downstream can not allocate buffers is already > >>> implemented FLINK-14872 [2]. There is a timeout for how long the task > is > >>> waiting for buffer allocation. However this doesn't prevent many > >>> (potentially infinitely many) deadlock/restarts cycles. IMO the propper > >>> solution for [1] would be 2b described in the ticket: > >>> > >>> > 2b. Assign extra buffers only once all of the tasks are RUNNING. This > >>> is a simplified version of 2a, without tracking the tasks > sink-to-source. > >>> > >>> But that's a pre-existing problem and I don't think we have to solve it > >>> before implementing overdraft. I think we would need to solve it only > >>> before setting Integer.MAX_VALUE as the default for the overdraft. > Maybe I > >>> would hesitate setting the overdraft to anything more then a couple of > >>> buffers by default for the same reason. > >>> > >>> > Actually, I totally agree that we don't need a lot of buffers for > >>> overdraft > >>> > >>> and > >>> > >>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. > >>> > When we finish this feature and after users use it, if users feedback > >>> > this issue we can discuss again. > >>> > >>> +1 > >>> > >>> Piotrek > >>> > >>> [1] https://issues.apache.org/jira/browse/FLINK-13203 > >>> [2] https://issues.apache.org/jira/browse/FLINK-14872 > >>> > >>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a): > >>> > >>>> Hi everyone, > >>>> > >>>> I still have some questions. > >>>> > >>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers, > will > >>>> LocalBufferPool never be available? Can't process data? > >>>> 2. If the overdraft buffer use the extra buffers, when the downstream > >>>> task inputBuffer is insufficient, it should fail to start the job, and > >>>> then > >>>> restart? When the InputBuffer is initialized, it will apply for enough > >>>> buffers, right? > >>>> > >>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. > >>>> When we finish this feature and after users use it, if users feedback > >>>> this issue we can discuss again. > >>>> > >>>> Thanks > >>>> fanrui > >>>> > >>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz < > dwysakow...@apache.org> > >>>> wrote: > >>>> > >>>>> Hey all, > >>>>> > >>>>> I have not replied in the thread yet, but I was following the > >>>>> discussion. > >>>>> > >>>>> Personally, I like Fanrui's and Anton's idea. As far as I understand > >>>>> it > >>>>> the idea to distinguish between inside flatMap & outside would be > >>>>> fairly > >>>>> simple, but maybe slightly indirect. The checkAvailability would > >>>>> remain > >>>>> unchanged and it is checked always between separate invocations of > the > >>>>> UDF. Therefore the overdraft buffers would not apply there. However > >>>>> once > >>>>> the pool says it is available, it means it has at least an initial > >>>>> buffer. So any additional request without checking for availability > >>>>> can > >>>>> be considered to be inside of processing a single record. This does > >>>>> not > >>>>> hold just for the LegacySource as I don't think it actually checks > for > >>>>> the availability of buffers in the LocalBufferPool. > >>>>> > >>>>> In the offline chat with Anton, we also discussed if we need a limit > >>>>> of > >>>>> the number of buffers we could overdraft (or in other words if the > >>>>> limit > >>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to > >>>>> stay > >>>>> on the safe side and have it limited. The pool of network buffers is > >>>>> shared for the entire TaskManager, so it means it can be shared even > >>>>> across tasks of separate jobs. However, I might be just unnecessarily > >>>>> cautious here. > >>>>> > >>>>> Best, > >>>>> > >>>>> Dawid > >>>>> > >>>>> On 04/05/2022 10:54, Piotr Nowojski wrote: > >>>>> > Hi, > >>>>> > > >>>>> > Thanks for the answers. > >>>>> > > >>>>> >> we may still need to discuss whether the > >>>>> >> overdraft/reserve/spare should use extra buffers or buffers > >>>>> >> in (exclusive + floating buffers)? > >>>>> > and > >>>>> > > >>>>> >> These things resolve the different problems (at least as I see > >>>>> that). > >>>>> >> The current hardcoded "1" says that we switch "availability" to > >>>>> >> "unavailability" when one more buffer is left(actually a little > less > >>>>> >> than one buffer since we write the last piece of data to this last > >>>>> >> buffer). The overdraft feature doesn't change this logic we still > >>>>> want > >>>>> >> to switch to "unavailability" in such a way but if we are already > in > >>>>> >> "unavailability" and we want more buffers then we can take > >>>>> "overdraft > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we need > >>>>> to > >>>>> >> understand when we should switch to "unavailability" > >>>>> > Ok, I see. So it seems to me that both of you have in mind to keep > >>>>> the > >>>>> > buffer pools as they are right now, but if we are in the middle of > >>>>> > processing a record, we can request extra overdraft buffers on top > of > >>>>> > those? This is another way to implement the overdraft to what I was > >>>>> > thinking. I was thinking about something like keeping the > >>>>> "overdraft" or > >>>>> > more precisely buffer "reserve" in the buffer pool. I think my > >>>>> version > >>>>> > would be easier to implement, because it is just fiddling with > >>>>> min/max > >>>>> > buffers calculation and slightly modified `checkAvailability()` > >>>>> logic. > >>>>> > > >>>>> > On the other hand what you have in mind would better utilise the > >>>>> available > >>>>> > memory, right? It would require more code changes (how would we > know > >>>>> when > >>>>> > we are allowed to request the overdraft?). However, in this case, I > >>>>> would > >>>>> > be tempted to set the number of overdraft buffers by default to > >>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers as > >>>>> > necessary. The only downside that I can think of (apart of higher > >>>>> > complexity) would be higher chance of hitting a known/unsolved > >>>>> deadlock [1] > >>>>> > in a scenario: > >>>>> > - downstream task hasn't yet started > >>>>> > - upstream task requests overdraft and uses all available memory > >>>>> segments > >>>>> > from the global pool > >>>>> > - upstream task is blocked, because downstream task hasn't started > >>>>> yet and > >>>>> > can not consume any data > >>>>> > - downstream task tries to start, but can not, as there are no > >>>>> available > >>>>> > buffers > >>>>> > > >>>>> >> BTW, for watermark, the number of buffers it needs is > >>>>> >> numberOfSubpartitions. So if > overdraftBuffers=numberOfSubpartitions, > >>>>> >> the watermark won't block in requestMemory. > >>>>> > and > >>>>> > > >>>>> >> the best overdraft size will be equal to parallelism. > >>>>> > That's a lot of buffers. I don't think we need that many for > >>>>> broadcasting > >>>>> > watermarks. Watermarks are small, and remember that every > >>>>> subpartition has > >>>>> > some partially filled/empty WIP buffer, so the vast majority of > >>>>> > subpartitions will not need to request a new buffer. > >>>>> > > >>>>> > Best, > >>>>> > Piotrek > >>>>> > > >>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203 > >>>>> > > >>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> > >>>>> napisał(a): > >>>>> > > >>>>> >> Hi, > >>>>> >> > >>>>> >> > >>>>> >> >> Do you mean to ignore it while processing records, but keep > >>>>> using > >>>>> >> `maxBuffersPerChannel` when calculating the availability of the > >>>>> output? > >>>>> >> > >>>>> >> > >>>>> >> Yes, it is correct. > >>>>> >> > >>>>> >> > >>>>> >> >> Would it be a big issue if we changed it to check if at least > >>>>> >> "overdraft number of buffers are available", where "overdraft > >>>>> number" is > >>>>> >> configurable, instead of the currently hardcoded value of "1"? > >>>>> >> > >>>>> >> > >>>>> >> These things resolve the different problems (at least as I see > >>>>> that). > >>>>> >> The current hardcoded "1" says that we switch "availability" to > >>>>> >> "unavailability" when one more buffer is left(actually a little > less > >>>>> >> than one buffer since we write the last piece of data to this last > >>>>> >> buffer). The overdraft feature doesn't change this logic we still > >>>>> want > >>>>> >> to switch to "unavailability" in such a way but if we are already > in > >>>>> >> "unavailability" and we want more buffers then we can take > >>>>> "overdraft > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we need > >>>>> to > >>>>> >> understand when we should switch to "unavailability" > >>>>> >> > >>>>> >> > >>>>> >> -- About "reserve" vs "overdraft" > >>>>> >> > >>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will > be > >>>>> >> equal to parallelism. Also, the user can set any value he wants. > So > >>>>> even > >>>>> >> if parallelism is small(~5) but the user's flatmap produces a lot > of > >>>>> >> data, the user can set 10 or even more. Which almost double the > max > >>>>> >> buffers and it will be impossible to reserve. At least we need to > >>>>> figure > >>>>> >> out how to protect from such cases (the limit for an overdraft?). > So > >>>>> >> actually it looks even more difficult than increasing the maximum > >>>>> buffers. > >>>>> >> > >>>>> >> I want to emphasize that overdraft buffers are soft configuration > >>>>> which > >>>>> >> means it takes as many buffers as the global buffers pool has > >>>>> >> available(maybe zero) but less than this configured value. It is > >>>>> also > >>>>> >> important to notice that perhaps, not many subtasks in TaskManager > >>>>> will > >>>>> >> be using this feature so we don't actually need a lot of available > >>>>> >> buffers for every subtask(Here, I mean that if we have only one > >>>>> >> window/flatmap operator and many other operators, then one > >>>>> TaskManager > >>>>> >> will have many ordinary subtasks which don't actually need > >>>>> overdraft and > >>>>> >> several subtasks that needs this feature). But in case of > >>>>> reservation, > >>>>> >> we will reserve some buffers for all operators even if they don't > >>>>> really > >>>>> >> need it. > >>>>> >> > >>>>> >> > >>>>> >> -- Legacy source problem > >>>>> >> > >>>>> >> If we still want to change max buffers then it is problem for > >>>>> >> LegacySources(since every subtask of source will always use these > >>>>> >> overdraft). But right now, I think that we can force to set 0 > >>>>> overdraft > >>>>> >> buffers for legacy subtasks in configuration during execution(if > it > >>>>> is > >>>>> >> not too late for changing configuration in this place). > >>>>> >> > >>>>> >> > >>>>> >> 03.05.2022 14:11, rui fan пишет: > >>>>> >>> Hi > >>>>> >>> > >>>>> >>> Thanks for Martijn Visser and Piotrek's feedback. I agree with > >>>>> >>> ignoring the legacy source, it will affect our design. User > should > >>>>> >>> use the new Source Api as much as possible. > >>>>> >>> > >>>>> >>> Hi Piotrek, we may still need to discuss whether the > >>>>> >>> overdraft/reserve/spare should use extra buffers or buffers > >>>>> >>> in (exclusive + floating buffers)? They have some differences. > >>>>> >>> > >>>>> >>> If it uses extra buffers: > >>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1 > >>>>> >>> <= currentPoolSize) and all subpartitions don't reach the > >>>>> >>> maxBuffersPerChannel. > >>>>> >>> > >>>>> >>> If it uses the buffers in (exclusive + floating buffers): > >>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers + > >>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions > >>>>> >>> don't reach the maxBuffersPerChannel. > >>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), > the > >>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be > >>>>> >>> easily unavailable. For throughput, if users turn up the > >>>>> >>> overdraft buffers, they need to turn up exclusive or floating > >>>>> >>> buffers. It also affects the InputChannel, and it's is unfriendly > >>>>> >>> to users. > >>>>> >>> > >>>>> >>> So I prefer the overdraft to use extra buffers. > >>>>> >>> > >>>>> >>> > >>>>> >>> BTW, for watermark, the number of buffers it needs is > >>>>> >>> numberOfSubpartitions. So if > >>>>> overdraftBuffers=numberOfSubpartitions, > >>>>> >>> the watermark won't block in requestMemory. But it has > >>>>> >>> 2 problems: > >>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses > >>>>> >>> (exclusive + floating buffers), there will be fewer buffers > >>>>> >>> available. Throughput may be affected. > >>>>> >>> 2. The numberOfSubpartitions is different for each Task. > >>>>> >>> So if users want to cover watermark using this feature, > >>>>> >>> they don't know how to set the overdraftBuffers more r > >>>>> >>> easonably. And if the parallelism is changed, users still > >>>>> >>> need to change overdraftBuffers. It is unfriendly to users. > >>>>> >>> > >>>>> >>> So I propose we support overdraftBuffers=-1, It means > >>>>> >>> we will automatically set overdraftBuffers=numberOfSubpartitions > >>>>> >>> in the Constructor of LocalBufferPool. > >>>>> >>> > >>>>> >>> Please correct me if I'm wrong. > >>>>> >>> > >>>>> >>> Thanks > >>>>> >>> fanrui > >>>>> >>> > >>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski < > >>>>> pnowoj...@apache.org> > >>>>> >> wrote: > >>>>> >>>> Hi fanrui, > >>>>> >>>> > >>>>> >>>>> Do you mean don't add the extra buffers? We just use (exclusive > >>>>> >> buffers * > >>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be > >>>>> available > >>>>> >>>> when > >>>>> >>>>> (usedBuffers+overdraftBuffers <= > >>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers) > >>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, > >>>>> right? > >>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum > >>>>> number of > >>>>> >> the > >>>>> >>>> required buffers, just as we did when we were implementing the > non > >>>>> >> blocking > >>>>> >>>> outputs and adding availability logic to LocalBufferPool. Back > >>>>> then we > >>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this > >>>>> logic is > >>>>> >>>> located > >>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition: > >>>>> >>>> > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : > >>>>> numSubpartitions + 1; > >>>>> >>>> For performance reasons, we always require at least one buffer > per > >>>>> >>>> sub-partition. Otherwise performance falls drastically. Now if > we > >>>>> >> require 5 > >>>>> >>>> overdraft buffers for output to be available, we need to have > >>>>> them on > >>>>> >> top > >>>>> >>>> of those "one buffer per sub-partition". So the logic should be > >>>>> changed > >>>>> >> to: > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : > >>>>> numSubpartitions + > >>>>> >>>> numOverdraftBuffers; > >>>>> >>>> > >>>>> >>>> Regarding increasing the number of max buffers I'm not sure. As > >>>>> long as > >>>>> >>>> "overdraft << max number of buffers", because all buffers on the > >>>>> outputs > >>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft > >>>>> buffers, > >>>>> >> and > >>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of > >>>>> things if > >>>>> >> we > >>>>> >>>> make the output available if at least one single buffer is > >>>>> available or > >>>>> >> at > >>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So > >>>>> effects of > >>>>> >>>> increasing the overdraft from 1 to for example 5 should be > >>>>> negligible. > >>>>> >> For > >>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 > still > >>>>> >> increases > >>>>> >>>> the overdraft by only about 25%. So maybe we can keep the max as > >>>>> it is? > >>>>> >>>> > >>>>> >>>> If so, maybe we should change the name from "overdraft" to > "buffer > >>>>> >> reserve" > >>>>> >>>> or "spare buffers"? And document it as "number of buffers kept > in > >>>>> >> reserve > >>>>> >>>> in case of flatMap/firing timers/huge records"? > >>>>> >>>> > >>>>> >>>> What do you think Fenrui, Anton? > >>>>> >>>> > >>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the new > >>>>> >> features, > >>>>> >>>> as long as we don't brake the existing deployments too much. > >>>>> >>>> > >>>>> >>>> Best, > >>>>> >>>> Piotrek > >>>>> >>>> > >>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com> > >>>>> >> napisał(a): > >>>>> >>>>> Hi everyone, > >>>>> >>>>> > >>>>> >>>>> Just wanted to chip in on the discussion of legacy sources: > >>>>> IMHO, we > >>>>> >>>> should > >>>>> >>>>> not focus too much on improving/adding capabilities for legacy > >>>>> sources. > >>>>> >>>> We > >>>>> >>>>> want to persuade and push users to use the new Source API. Yes, > >>>>> this > >>>>> >>>> means > >>>>> >>>>> that there's work required by the end users to port any custom > >>>>> source > >>>>> >> to > >>>>> >>>>> the new interface. The benefits of the new Source API should > >>>>> outweigh > >>>>> >>>> this. > >>>>> >>>>> Anything that we build to support multiple interfaces means > >>>>> adding more > >>>>> >>>>> complexity and more possibilities for bugs. Let's try to make > our > >>>>> >> lives a > >>>>> >>>>> little bit easier. > >>>>> >>>>> > >>>>> >>>>> Best regards, > >>>>> >>>>> > >>>>> >>>>> Martijn Visser > >>>>> >>>>> https://twitter.com/MartijnVisser82 > >>>>> >>>>> https://github.com/MartijnVisser > >>>>> >>>>> > >>>>> >>>>> > >>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> > >>>>> wrote: > >>>>> >>>>> > >>>>> >>>>>> Hi Piotrek > >>>>> >>>>>> > >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep > >>>>> using > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of > the > >>>>> >>>> output? > >>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double > check. > >>>>> >>>>>> > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a > >>>>> big > >>>>> >>>>> problem > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we > already > >>>>> have > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer. > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a > single > >>>>> >>>> buffer > >>>>> >>>>>>> available and this works the same for all tasks (including > >>>>> legacy > >>>>> >>>>> source > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if > >>>>> at least > >>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft > >>>>> number" > >>>>> >>>>> is > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of > "1"? > >>>>> >>>>>> Do you mean don't add the extra buffers? We just use > (exclusive > >>>>> >>>> buffers * > >>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be > >>>>> available > >>>>> >>>>> when > >>>>> >>>>>> (usedBuffers+overdraftBuffers <= > >>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers) > >>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, > >>>>> right? > >>>>> >>>>>> > >>>>> >>>>>> If yes, I think it can solve the problem of legacy source. > >>>>> There may > >>>>> >> be > >>>>> >>>>>> some impact. If overdraftBuffers is large and only one buffer > >>>>> is used > >>>>> >>>> to > >>>>> >>>>>> process a single record, exclusive buffers*parallelism + > >>>>> floating > >>>>> >>>> buffers > >>>>> >>>>>> cannot be used. It may only be possible to use (exclusive > >>>>> buffers * > >>>>> >>>>>> parallelism > >>>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, > if > >>>>> turn > >>>>> >> up > >>>>> >>>>> the > >>>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive > or > >>>>> >>>> floating > >>>>> >>>>>> buffers. And it also affects the InputChannel. > >>>>> >>>>>> > >>>>> >>>>>> If not, I don't think it can solve the problem of legacy > >>>>> source. The > >>>>> >>>>> legacy > >>>>> >>>>>> source don't check isAvailable, If there are the extra > buffers, > >>>>> legacy > >>>>> >>>>>> source > >>>>> >>>>>> will use them up until block in requestMemory. > >>>>> >>>>>> > >>>>> >>>>>> > >>>>> >>>>>> Thanks > >>>>> >>>>>> fanrui > >>>>> >>>>>> > >>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski < > >>>>> pnowoj...@apache.org> > >>>>> >>>>>> wrote: > >>>>> >>>>>> > >>>>> >>>>>>> Hi, > >>>>> >>>>>>> > >>>>> >>>>>>> +1 for the general proposal from my side. It would be a nice > >>>>> >>>> workaround > >>>>> >>>>>>> flatMaps, WindowOperators and large records issues with > >>>>> unaligned > >>>>> >>>>>>> checkpoints. > >>>>> >>>>>>> > >>>>> >>>>>>>> The first task is about ignoring max buffers per channel. > This > >>>>> >>>> means > >>>>> >>>>> if > >>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the > >>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just > >>>>> ignore > >>>>> >>>> that > >>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has > >>>>> it(it is > >>>>> >>>>>>>> actually not a overdraft). > >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep > >>>>> using > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of > the > >>>>> >>>> output? > >>>>> >>>>>>>> The second task is about the real overdraft. I am pretty > >>>>> convinced > >>>>> >>>>> now > >>>>> >>>>>>>> that we, unfortunately, need configuration for limitation of > >>>>> >>>>> overdraft > >>>>> >>>>>>>> number(because it is not ok if one subtask allocates all > >>>>> buffers of > >>>>> >>>>> one > >>>>> >>>>>>>> TaskManager considering that several different jobs can be > >>>>> >>>> submitted > >>>>> >>>>> on > >>>>> >>>>>>>> this TaskManager). So idea is to have > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per > >>>>> >>>>>> LocalBufferPool). > >>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is > >>>>> >>>> reached, > >>>>> >>>>>>>> LocalBufferPool can request additionally from > >>>>> NetworkBufferPool up > >>>>> >>>> to > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers. > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it a > >>>>> big > >>>>> >>>>> problem > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we > already > >>>>> have > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer. > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a > single > >>>>> >>>> buffer > >>>>> >>>>>>> available and this works the same for all tasks (including > >>>>> legacy > >>>>> >>>>> source > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check if > >>>>> at least > >>>>> >>>>>>> "overdraft number of buffers are available", where "overdraft > >>>>> number" > >>>>> >>>>> is > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of > "1"? > >>>>> >>>>>>> > >>>>> >>>>>>> Best, > >>>>> >>>>>>> Piotrek > >>>>> >>>>>>> > >>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> > >>>>> napisał(a): > >>>>> >>>>>>> > >>>>> >>>>>>>> Let me add some information about the LegacySource. > >>>>> >>>>>>>> > >>>>> >>>>>>>> If we want to disable the overdraft buffer for LegacySource. > >>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool? > >>>>> >>>>>>>> The default value is false. If the getAvailableFuture is > >>>>> called, > >>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there are > >>>>> >>>>>>>> checks isAvailable elsewhere. > >>>>> >>>>>>>> > >>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct > me > >>>>> if > >>>>> >>>> I'm > >>>>> >>>>>>> wrong. > >>>>> >>>>>>>> Thanks > >>>>> >>>>>>>> fanrui > >>>>> >>>>>>>> > >>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan < > >>>>> 1996fan...@gmail.com> > >>>>> >>>>> wrote: > >>>>> >>>>>>>>> Hi, > >>>>> >>>>>>>>> > >>>>> >>>>>>>>> Thanks for your quick response. > >>>>> >>>>>>>>> > >>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need > to > >>>>> >>>>> discuss > >>>>> >>>>>>> the > >>>>> >>>>>>>>> default value in PR. > >>>>> >>>>>>>>> > >>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for > >>>>> general > >>>>> >>>>>>>>> implementation. > >>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in > >>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common > >>>>> >>>>> LegacySource, > >>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume > >>>>> >>>> kafka, > >>>>> >>>>> so > >>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems. > >>>>> >>>>>>>>> > >>>>> >>>>>>>>> Core code: > >>>>> >>>>>>>>> ``` > >>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() { > >>>>> >>>>>>>>> if (recordWriter == null > >>>>> >>>>>>>>> || > >>>>> >>>>>>>>> > >>>>> >> > >>>>> > !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, > >>>>> >>>>>>>>> false) > >>>>> >>>>>>>>> || recordWriter.isAvailable()) { > >>>>> >>>>>>>>> return; > >>>>> >>>>>>>>> } > >>>>> >>>>>>>>> > >>>>> >>>>>>>>> CompletableFuture<?> resumeFuture = > >>>>> >>>>>>>> recordWriter.getAvailableFuture(); > >>>>> >>>>>>>>> try { > >>>>> >>>>>>>>> resumeFuture.get(); > >>>>> >>>>>>>>> } catch (Throwable ignored) { > >>>>> >>>>>>>>> } > >>>>> >>>>>>>>> } > >>>>> >>>>>>>>> ``` > >>>>> >>>>>>>>> > >>>>> >>>>>>>>> LegacySource calls > >>>>> sourceContext.ensureRecordWriterIsAvailable() > >>>>> >>>>>>>>> before synchronized (checkpointLock) and collects records. > >>>>> >>>>>>>>> Please let me know if there is a better solution. > >>>>> >>>>>>>>> > >>>>> >>>>>>>>> Thanks > >>>>> >>>>>>>>> fanrui > >>>>> >>>>>>>>> > >>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov < > >>>>> >>>>>> kaa....@yandex.com> > >>>>> >>>>>>>>> wrote: > >>>>> >>>>>>>>> > >>>>> >>>>>>>>>> Hi. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or > >>>>> two > >>>>> >>>>>> commits > >>>>> >>>>>>>> in a > >>>>> >>>>>>>>>> PR? > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this > >>>>> task has > >>>>> >>>>>> fewer > >>>>> >>>>>>>>>> questions but we should find a solution for LegacySource > >>>>> first. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> -- 2. For the first task, if the flink user disables the > >>>>> >>>>> Unaligned > >>>>> >>>>>>>>>> Checkpoint, do we ignore max buffers per channel? > >>>>> Because > >>>>> >>>> the > >>>>> >>>>>>>>>> overdraft > >>>>> >>>>>>>>>> isn't useful for the Aligned Checkpoint, it still > >>>>> needs to > >>>>> >>>>> wait > >>>>> >>>>>>> for > >>>>> >>>>>>>>>> downstream Task to consume. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> I think that the logic should be the same for AC and UC. > As > >>>>> I > >>>>> >>>>>>>> understand, > >>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it > >>>>> doesn't > >>>>> >>>>> make > >>>>> >>>>>>> it > >>>>> >>>>>>>>>> worse as well. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> 3. For the second task > >>>>> >>>>>>>>>> -- - The default value of > >>>>> maxOverdraftBuffersPerPartition > >>>>> >>>> may > >>>>> >>>>>>> also > >>>>> >>>>>>>>>> need > >>>>> >>>>>>>>>> to be discussed. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> I think it should be a pretty small value or even 0 since > it > >>>>> >>>> kind > >>>>> >>>>> of > >>>>> >>>>>>>>>> optimization and user should understand what they > >>>>> do(especially > >>>>> >>>> if > >>>>> >>>>>> we > >>>>> >>>>>>>>>> implement the first task). > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> -- - If the user disables the Unaligned Checkpoint, > >>>>> can we > >>>>> >>>>> set > >>>>> >>>>>>> the > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the > >>>>> overdraft > >>>>> >>>>>> isn't > >>>>> >>>>>>>>>> useful for > >>>>> >>>>>>>>>> the Aligned Checkpoint. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't make > >>>>> >>>>>> degradation > >>>>> >>>>>>>> for > >>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make > >>>>> >>>>> difference > >>>>> >>>>>>>> between > >>>>> >>>>>>>>>> AC and UC. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> 4. For the legacy source > >>>>> >>>>>>>>>> -- - If enabling the Unaligned Checkpoint, it uses up > >>>>> to > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers. > >>>>> >>>>>>>>>> - If disabling the UC, it doesn't use the > overdraft > >>>>> >>>> buffer. > >>>>> >>>>>>>>>> - Do you think it's ok? > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at > >>>>> all > >>>>> >>>>> since > >>>>> >>>>>>> it > >>>>> >>>>>>>>>> can lead to undesirable results especially if the limit is > >>>>> high. > >>>>> >>>>> At > >>>>> >>>>>>>> least, > >>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode and > >>>>> it > >>>>> >>>> will > >>>>> >>>>>>>> borrow > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global > pool > >>>>> >>>> which > >>>>> >>>>>> can > >>>>> >>>>>>>> lead > >>>>> >>>>>>>>>> to degradation of other subtasks on the same TaskManager. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> -- - Actually, we added the checkAvailable logic for > >>>>> >>>>>> LegacySource > >>>>> >>>>>>>> in > >>>>> >>>>>>>>>> our > >>>>> >>>>>>>>>> internal version. It works well. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> I don't really understand how it is possible for general > >>>>> case > >>>>> >>>>>>>> considering > >>>>> >>>>>>>>>> that each user has their own implementation of > >>>>> >>>>> LegacySourceOperator > >>>>> >>>>>>>>>> -- 5. For the benchmark, do you have any suggestions? I > >>>>> >>>>> submitted > >>>>> >>>>>>> the > >>>>> >>>>>>>> PR > >>>>> >>>>>>>>>> [1]. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon. > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет: > >>>>> >>>>>>>>>>> Hi, > >>>>> >>>>>>>>>>> > >>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions. > >>>>> >>>>>>>>>>> > >>>>> >>>>>>>>>>> 1. Do you mean split this into two JIRAs or two PRs > >>>>> or two > >>>>> >>>>>>> commits > >>>>> >>>>>>>>>> in a > >>>>> >>>>>>>>>>> PR? > >>>>> >>>>>>>>>>> 2. For the first task, if the flink user disables > the > >>>>> >>>>>> Unaligned > >>>>> >>>>>>>>>>> Checkpoint, do we ignore max buffers per channel? > >>>>> Because > >>>>> >>>>> the > >>>>> >>>>>>>>>> overdraft > >>>>> >>>>>>>>>>> isn't useful for the Aligned Checkpoint, it still > >>>>> needs to > >>>>> >>>>>> wait > >>>>> >>>>>>>> for > >>>>> >>>>>>>>>>> downstream Task to consume. > >>>>> >>>>>>>>>>> 3. For the second task > >>>>> >>>>>>>>>>> - The default value of > >>>>> maxOverdraftBuffersPerPartition > >>>>> >>>>> may > >>>>> >>>>>>> also > >>>>> >>>>>>>>>> need > >>>>> >>>>>>>>>>> to be discussed. > >>>>> >>>>>>>>>>> - If the user disables the Unaligned Checkpoint, > >>>>> can we > >>>>> >>>>> set > >>>>> >>>>>>> the > >>>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the > >>>>> >>>> overdraft > >>>>> >>>>>>> isn't > >>>>> >>>>>>>>>> useful for > >>>>> >>>>>>>>>>> the Aligned Checkpoint. > >>>>> >>>>>>>>>>> 4. For the legacy source > >>>>> >>>>>>>>>>> - If enabling the Unaligned Checkpoint, it uses > >>>>> up to > >>>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. > >>>>> >>>>>>>>>>> - If disabling the UC, it doesn't use the > >>>>> overdraft > >>>>> >>>>> buffer. > >>>>> >>>>>>>>>>> - Do you think it's ok? > >>>>> >>>>>>>>>>> - Actually, we added the checkAvailable logic > for > >>>>> >>>>>>> LegacySource > >>>>> >>>>>>>>>> in our > >>>>> >>>>>>>>>>> internal version. It works well. > >>>>> >>>>>>>>>>> 5. For the benchmark, do you have any suggestions? > I > >>>>> >>>>> submitted > >>>>> >>>>>>> the > >>>>> >>>>>>>>>> PR > >>>>> >>>>>>>>>>> [1]. > >>>>> >>>>>>>>>>> > >>>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54 > >>>>> >>>>>>>>>>> > >>>>> >>>>>>>>>>> Thanks > >>>>> >>>>>>>>>>> fanrui > >>>>> >>>>>>>>>>> > >>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov < > >>>>> >>>>>>> kaa....@yandex.com > >>>>> >>>>>>>>>>> wrote: > >>>>> >>>>>>>>>>> > >>>>> >>>>>>>>>>>> Hi, > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here > >>>>> is > >>>>> >>>>> some > >>>>> >>>>>>>>>>>> conclusion: > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> First of all, let's split this into two tasks. > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per > channel. > >>>>> >>>> This > >>>>> >>>>>>> means > >>>>> >>>>>>>> if > >>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the > >>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we > just > >>>>> >>>>> ignore > >>>>> >>>>>>> that > >>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool > has > >>>>> >>>> it(it > >>>>> >>>>>> is > >>>>> >>>>>>>>>>>> actually not a overdraft). > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty > >>>>> >>>>>> convinced > >>>>> >>>>>>>> now > >>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for > limitation > >>>>> of > >>>>> >>>>>>>> overdraft > >>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all > >>>>> >>>>> buffers > >>>>> >>>>>> of > >>>>> >>>>>>>> one > >>>>> >>>>>>>>>>>> TaskManager considering that several different jobs can > be > >>>>> >>>>>>> submitted > >>>>> >>>>>>>> on > >>>>> >>>>>>>>>>>> this TaskManager). So idea is to have > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per > >>>>> >>>>>>>>>> LocalBufferPool). > >>>>> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool > >>>>> is > >>>>> >>>>>>> reached, > >>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from > >>>>> >>>> NetworkBufferPool > >>>>> >>>>>> up > >>>>> >>>>>>> to > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource > >>>>> since it > >>>>> >>>>>>>> actually > >>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in > >>>>> >>>> overdraft > >>>>> >>>>>>> mode > >>>>> >>>>>>>>>>>> which is not a target. So we still need to think about > >>>>> that. > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет: > >>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov, > >>>>> >>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum > >>>>> number of > >>>>> >>>>>>>> overdraft > >>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, > right? > >>>>> >>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to > >>>>> don't > >>>>> >>>> add > >>>>> >>>>>> the > >>>>> >>>>>>>> new > >>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the > >>>>> community. > >>>>> >>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>> Best wishes > >>>>> >>>>>>>>>>>>> fanrui > >>>>> >>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan < > >>>>> >>>>> 1996fan...@gmail.com> > >>>>> >>>>>>>>>> wrote: > >>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov, > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are > >>>>> totally > >>>>> >>>>>> right. > >>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be > used > >>>>> as > >>>>> >>>>> the > >>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer > >>>>> >>>>>>> configuration.Flink > >>>>> >>>>>>>>>> users > >>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the > >>>>> overdraft > >>>>> >>>>>> buffer > >>>>> >>>>>>>>>> size. > >>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we > >>>>> >>>> should > >>>>> >>>>>>> limit > >>>>> >>>>>>>>>> the > >>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each > >>>>> >>>>> LocalBufferPool > >>>>> >>>>>>>>>>>>>> can apply for. > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>> Why do we limit it? > >>>>> >>>>>>>>>>>>>> Some operators don't check the > >>>>> `recordWriter.isAvailable` > >>>>> >>>>>> during > >>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have > >>>>> mentioned > >>>>> >>>> it > >>>>> >>>>>> in > >>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other > cases. > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use > >>>>> up > >>>>> >>>> all > >>>>> >>>>>>>>>> remaining > >>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure > is > >>>>> >>>>>> severe. > >>>>> >>>>>>>>>>>>>> How to limit it? > >>>>> >>>>>>>>>>>>>> I prefer to hard code the > >>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions` > >>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The > >>>>> >>>>> maxOverdraftBuffers > >>>>> >>>>>> is > >>>>> >>>>>>>>>> just > >>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink > >>>>> jobs. Or > >>>>> >>>>> we > >>>>> >>>>>>> can > >>>>> >>>>>>>>>> set > >>>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, > >>>>> 10)` > >>>>> >>>> to > >>>>> >>>>>>> handle > >>>>> >>>>>>>>>>>>>> some jobs of low parallelism. > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we > >>>>> can > >>>>> >>>>> set > >>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of > >>>>> >>>> LocalBufferPool. > >>>>> >>>>>>>> Because > >>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint. > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot. > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759 > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>> Best wishes > >>>>> >>>>>>>>>>>>>> fanrui > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov < > >>>>> >>>>>>>>>> kaa....@yandex.com> > >>>>> >>>>>>>>>>>>>> wrote: > >>>>> >>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> Hi fanrui, > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP. > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it > >>>>> >>>> should > >>>>> >>>>>>> help > >>>>> >>>>>>>> in > >>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about > >>>>> >>>>>> configuration: > >>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand > >>>>> right > >>>>> >>>>> now > >>>>> >>>>>>> we > >>>>> >>>>>>>>>> have > >>>>> >>>>>>>>>>>>>>> following calculation. > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network > >>>>> >>>>> memory(calculated > >>>>> >>>>>>> via > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction, > >>>>> >>>>>>>> taskmanager.memory.network.min, > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory > size) / > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size. > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive > >>>>> >>>> buffers > >>>>> >>>>> * > >>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in > >>>>> >>>>>> TaskManager > >>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which > used > >>>>> at > >>>>> >>>>>>> current > >>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber) > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to > >>>>> >>>>>>> maxBuffersNumber > >>>>> >>>>>>>>>> which > >>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if > >>>>> >>>> requiredBuffersNumber > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not > >>>>> good) > >>>>> >>>>> since > >>>>> >>>>>>> not > >>>>> >>>>>>>>>> all > >>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if > >>>>> Flink > >>>>> >>>> can > >>>>> >>>>>> not > >>>>> >>>>>>>>>>>>>>> allocate floating buffers) > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I > >>>>> >>>>>> understand > >>>>> >>>>>>>>>> Flink > >>>>> >>>>>>>>>>>>>>> just never use these leftovers > >>>>> buffers(maxBuffersNumber - > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we > >>>>> can > >>>>> >>>>>> actualy > >>>>> >>>>>>>> use > >>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber - > >>>>> >>>> buffersInUseNumber' > >>>>> >>>>>>> since > >>>>> >>>>>>>>>> if > >>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators including > >>>>> >>>>> 'window' > >>>>> >>>>>>>> which > >>>>> >>>>>>>>>> can > >>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool). > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to > >>>>> >>>> requesting > >>>>> >>>>>>>> buffers > >>>>> >>>>>>>>>>>>>>> during processing single record while switching to > >>>>> >>>>>> unavalability > >>>>> >>>>>>>>>>>> between > >>>>> >>>>>>>>>>>>>>> records should be the same as we have it now): > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> * If one more buffer requested but > maxBuffersPerChannel > >>>>> >>>>>> reached, > >>>>> >>>>>>>>>> then > >>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers > >>>>> from > >>>>> >>>>> any > >>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet > >>>>> >>>> otherwise > >>>>> >>>>>>> from > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool) > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally > >>>>> >>>> allocate > >>>>> >>>>>> it > >>>>> >>>>>>>> from > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't > work, > >>>>> >>>> but I > >>>>> >>>>>>> like > >>>>> >>>>>>>> it > >>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch > without > >>>>> >>>> any > >>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be > >>>>> configuration > >>>>> >>>> by > >>>>> >>>>>>>>>> changing > >>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and > >>>>> requiredBuffersNumber. > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really > want > >>>>> to > >>>>> >>>>>>>> implement > >>>>> >>>>>>>>>> new > >>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to > >>>>> >>>>> correctly > >>>>> >>>>>>>>>> configure > >>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I > don't > >>>>> >>>> want > >>>>> >>>>>> to > >>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to > >>>>> >>>> resolve > >>>>> >>>>>> the > >>>>> >>>>>>>>>> problem > >>>>> >>>>>>>>>>>>>>> automatically(as described above). > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers > >>>>> >>>> correct? > >>>>> >>>>>>>>>>>>>>> -- > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> Best regards, > >>>>> >>>>>>>>>>>>>>> Anton Kalashnikov > >>>>> >>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет: > >>>>> >>>>>>>>>>>>>>>> Hi everyone, > >>>>> >>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major > feature > >>>>> of > >>>>> >>>>>> Flink. > >>>>> >>>>>>>> It > >>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout > >>>>> or > >>>>> >>>>> slow > >>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe. > >>>>> >>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work > >>>>> well > >>>>> >>>>>> when > >>>>> >>>>>>>> the > >>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers > >>>>> are > >>>>> >>>>>>> required > >>>>> >>>>>>>> to > >>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also > >>>>> mentioned > >>>>> >>>>> this > >>>>> >>>>>>>> issue > >>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve > >>>>> it. > >>>>> >>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail > the > >>>>> >>>>>>> overdraft > >>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton > >>>>> >>>> Kalashnikov, > >>>>> >>>>>>> there > >>>>> >>>>>>>>>> are > >>>>> >>>>>>>>>>>>>>>> still some points to discuss: > >>>>> >>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>> * There are already a lot of buffer-related > >>>>> >>>>>> configurations. > >>>>> >>>>>>>> Do > >>>>> >>>>>>>>>> we > >>>>> >>>>>>>>>>>>>>>> need to add a new configuration for the > >>>>> overdraft > >>>>> >>>>>> buffer? > >>>>> >>>>>>>>>>>>>>>> * Where should the overdraft buffer use > memory? > >>>>> >>>>>>>>>>>>>>>> * If the overdraft-buffer uses the memory > >>>>> remaining > >>>>> >>>> in > >>>>> >>>>>> the > >>>>> >>>>>>>>>>>>>>>> NetworkBufferPool, no new configuration > needs > >>>>> to be > >>>>> >>>>>>> added. > >>>>> >>>>>>>>>>>>>>>> * If adding a new configuration: > >>>>> >>>>>>>>>>>>>>>> o Should we set the overdraft-memory-size > >>>>> at the > >>>>> >>>> TM > >>>>> >>>>>>> level > >>>>> >>>>>>>>>> or > >>>>> >>>>>>>>>>>> the > >>>>> >>>>>>>>>>>>>>>> Task level? > >>>>> >>>>>>>>>>>>>>>> o Or set overdraft-buffers to indicate the > >>>>> number > >>>>> >>>>> of > >>>>> >>>>>>>>>>>>>>>> memory-segments that can be overdrawn. > >>>>> >>>>>>>>>>>>>>>> o What is the default value? How to set > >>>>> sensible > >>>>> >>>>>>>> defaults? > >>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it > >>>>> using > >>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers > >>>>> at > >>>>> >>>>> Task > >>>>> >>>>>>>> level, > >>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each > LocalBufferPool > >>>>> >>>> can > >>>>> >>>>>>>>>> overdraw up > >>>>> >>>>>>>>>>>>>>>> to 10 memory-segments. > >>>>> >>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback! > >>>>> >>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>> Thanks, > >>>>> >>>>>>>>>>>>>>>> fanrui > >>>>> >>>>>>>>>>>>>>>> > >>>>> >>>>>>>>>>>>>>>> [1] > >>>>> >>>>>>>>>>>>>>>> > >>>>> >> > >>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints > >>>>> >>>>>>>>>>>>>>>> [2] > https://issues.apache.org/jira/browse/FLINK-14396 > >>>>> >>>>>>>>>>>>>>>> [3] > https://issues.apache.org/jira/browse/FLINK-26762 > >>>>> >>>>>>>>>>>>>>>> [4] > >>>>> >>>>>>>>>>>>>>>> > >>>>> >> > >>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > >>>>> >>>>>>>>>>>>>>>> [5] > >>>>> >>>>>>>>>>>>>>>> > >>>>> >> > >>>>> > https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe > >>>>> >>>>>>>>>>>>>>>> [6] > >>>>> https://github.com/apache/flink-benchmarks/pull/54 > >>>>> >>>>>>>>>>>> -- > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> Best regards, > >>>>> >>>>>>>>>>>> Anton Kalashnikov > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>>>> > >>>>> >>>>>>>>>> -- > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> Best regards, > >>>>> >>>>>>>>>> Anton Kalashnikov > >>>>> >>>>>>>>>> > >>>>> >>>>>>>>>> > >>>>> >> -- > >>>>> >> > >>>>> >> Best regards, > >>>>> >> Anton Kalashnikov > >>>>> >> > >>>>> >> > >>>>> > >>>> >