Hi I created the FLINK-27530[1] as the parent ticket. And I updated it to FLIP.
[1] https://issues.apache.org/jira/browse/FLINK-27530 Thanks fanrui On Fri, May 6, 2022 at 4:27 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi, > > I'm not sure. Maybe 5 will be fine? Anton, Dawid, what do you think? > > Can you create a parent ticket for the whole FLIP to group all of the > issues together? > > Also FLIP should be officially voted first. > > Best, > Piotrek > > pt., 6 maj 2022 o 09:08 rui fan <1996fan...@gmail.com> napisał(a): > > > Hi Anton, Piotrek and Dawid, > > > > Thanks for your help. > > > > I created FLINK-27522[1] as the first task. And I will finish it asap. > > > > @Piotrek, for the default value, do you think it should be less > > than 5? What do you think about 3? Actually, I think 5 isn't big. > > It's 1 or 3 or 5 that doesn't matter much, the focus is on > > reasonably resolving deadlock problems. Or I push the second > > task to move forward first and we discuss the default value in PR. > > > > For the legacySource, I got your idea. And I propose we create > > the third task to handle it. Because it is independent and for > > compatibility with the old API. What do you think? I updated > > the third task on FLIP-227[2]. > > > > If all is ok, I will create a JIRA for the third Task and add it to > > FLIP-227. And I will develop them from the first task to the > > third task. > > > > Thanks again for your help. > > > > [1] https://issues.apache.org/jira/browse/FLINK-27522 > > [2] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > > > > Thanks > > fanrui > > > > On Fri, May 6, 2022 at 3:50 AM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > > > Hi fanrui, > > > > > > > How to identify legacySource? > > > > > > legacy sources are always using the SourceStreamTask class and > > > SourceStreamTask is used only for legacy sources. But I'm not sure how > to > > > enable/disable that. Adding `disableOverdraft()` call in > SourceStreamTask > > > would be better compared to relying on the `getAvailableFuture()` call > > > (isn't it used for back pressure metric anyway?). Ideally we should > > > enable/disable it in the constructors, but that might be tricky. > > > > > > > I prefer it to be between 5 and 10 > > > > > > I would vote for a smaller value because of FLINK-13203 > > > > > > Piotrek > > > > > > > > > > > > czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a): > > > > > >> Hi, > > >> > > >> Thanks a lot for your discussion. > > >> > > >> After several discussions, I think it's clear now. I updated the > > >> "Proposed Changes" of FLIP-227[1]. If I have something > > >> missing, please help to add it to FLIP, or add it in the mail > > >> and I can add it to FLIP. If everything is OK, I will create a > > >> new JIRA for the first task, and use FLINK-26762[2] as the > > >> second task. > > >> > > >> About the legacy source, do we set maxOverdraftBuffersPerGate=0 > > >> directly? How to identify legacySource? Or could we add > > >> the overdraftEnabled in LocalBufferPool? The default value > > >> is false. If the getAvailableFuture is called, change > > >> overdraftEnabled=true. > > >> It indicates whether there are checks isAvailable elsewhere. > > >> It might be more general, it can cover more cases. > > >> > > >> Also, I think the default value of 'max-overdraft-buffers-per-gate' > > >> needs to be confirmed. I prefer it to be between 5 and 10. How > > >> do you think? > > >> > > >> [1] > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > > >> [2] https://issues.apache.org/jira/browse/FLINK-26762 > > >> > > >> Thanks > > >> fanrui > > >> > > >> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org> > > >> wrote: > > >> > > >>> Hi again, > > >>> > > >>> After sleeping over this, if both versions (reserve and overdraft) > have > > >>> the same complexity, I would also prefer the overdraft. > > >>> > > >>> > `Integer.MAX_VALUE` as default value was my idea as well but now, > as > > >>> > Dawid mentioned, I think it is dangerous since it is too implicit > for > > >>> > the user and if the user submits one more job for the same > TaskManger > > >>> > > >>> As I mentioned, it's not only an issue with multiple jobs. The same > > >>> problem can happen with different subtasks from the same job, > > potentially > > >>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I > > would be > > >>> in favour of Integer.MAX_VALUE to be the default value, but as it > is, I > > >>> think we should indeed play on the safe side and limit it. > > >>> > > >>> > I still don't understand how should be limited "reserve" > > >>> implementation. > > >>> > I mean if we have X buffers in total and the user sets overdraft > > equal > > >>> > to X we obviously can not reserve all buffers, but how many we are > > >>> > allowed to reserve? Should it be a different configuration like > > >>> > percentegeForReservedBuffers? > > >>> > > >>> The reserve could be defined as percentage, or as a fixed number of > > >>> buffers. But yes. In normal operation subtask would not use the > > reserve, as > > >>> if numberOfAvailableBuffers < reserve, the output would be not > > available. > > >>> Only in the flatMap/timers/huge records case the reserve could be > used. > > >>> > > >>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers, > > >>> will LocalBufferPool never be available? Can't process data? > > >>> > > >>> Of course we would need to make sure that never happens. So the > reserve > > >>> should be < total buffer size. > > >>> > > >>> > 2. If the overdraft buffer use the extra buffers, when the > downstream > > >>> > task inputBuffer is insufficient, it should fail to start the job, > > and > > >>> then > > >>> > restart? When the InputBuffer is initialized, it will apply for > > enough > > >>> > buffers, right? > > >>> > > >>> The failover if downstream can not allocate buffers is already > > >>> implemented FLINK-14872 [2]. There is a timeout for how long the task > > is > > >>> waiting for buffer allocation. However this doesn't prevent many > > >>> (potentially infinitely many) deadlock/restarts cycles. IMO the > propper > > >>> solution for [1] would be 2b described in the ticket: > > >>> > > >>> > 2b. Assign extra buffers only once all of the tasks are RUNNING. > This > > >>> is a simplified version of 2a, without tracking the tasks > > sink-to-source. > > >>> > > >>> But that's a pre-existing problem and I don't think we have to solve > it > > >>> before implementing overdraft. I think we would need to solve it only > > >>> before setting Integer.MAX_VALUE as the default for the overdraft. > > Maybe I > > >>> would hesitate setting the overdraft to anything more then a couple > of > > >>> buffers by default for the same reason. > > >>> > > >>> > Actually, I totally agree that we don't need a lot of buffers for > > >>> overdraft > > >>> > > >>> and > > >>> > > >>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. > > >>> > When we finish this feature and after users use it, if users > feedback > > >>> > this issue we can discuss again. > > >>> > > >>> +1 > > >>> > > >>> Piotrek > > >>> > > >>> [1] https://issues.apache.org/jira/browse/FLINK-13203 > > >>> [2] https://issues.apache.org/jira/browse/FLINK-14872 > > >>> > > >>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a): > > >>> > > >>>> Hi everyone, > > >>>> > > >>>> I still have some questions. > > >>>> > > >>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers, > > will > > >>>> LocalBufferPool never be available? Can't process data? > > >>>> 2. If the overdraft buffer use the extra buffers, when the > downstream > > >>>> task inputBuffer is insufficient, it should fail to start the job, > and > > >>>> then > > >>>> restart? When the InputBuffer is initialized, it will apply for > enough > > >>>> buffers, right? > > >>>> > > >>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. > > >>>> When we finish this feature and after users use it, if users > feedback > > >>>> this issue we can discuss again. > > >>>> > > >>>> Thanks > > >>>> fanrui > > >>>> > > >>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz < > > dwysakow...@apache.org> > > >>>> wrote: > > >>>> > > >>>>> Hey all, > > >>>>> > > >>>>> I have not replied in the thread yet, but I was following the > > >>>>> discussion. > > >>>>> > > >>>>> Personally, I like Fanrui's and Anton's idea. As far as I > understand > > >>>>> it > > >>>>> the idea to distinguish between inside flatMap & outside would be > > >>>>> fairly > > >>>>> simple, but maybe slightly indirect. The checkAvailability would > > >>>>> remain > > >>>>> unchanged and it is checked always between separate invocations of > > the > > >>>>> UDF. Therefore the overdraft buffers would not apply there. However > > >>>>> once > > >>>>> the pool says it is available, it means it has at least an initial > > >>>>> buffer. So any additional request without checking for availability > > >>>>> can > > >>>>> be considered to be inside of processing a single record. This does > > >>>>> not > > >>>>> hold just for the LegacySource as I don't think it actually checks > > for > > >>>>> the availability of buffers in the LocalBufferPool. > > >>>>> > > >>>>> In the offline chat with Anton, we also discussed if we need a > limit > > >>>>> of > > >>>>> the number of buffers we could overdraft (or in other words if the > > >>>>> limit > > >>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer to > > >>>>> stay > > >>>>> on the safe side and have it limited. The pool of network buffers > is > > >>>>> shared for the entire TaskManager, so it means it can be shared > even > > >>>>> across tasks of separate jobs. However, I might be just > unnecessarily > > >>>>> cautious here. > > >>>>> > > >>>>> Best, > > >>>>> > > >>>>> Dawid > > >>>>> > > >>>>> On 04/05/2022 10:54, Piotr Nowojski wrote: > > >>>>> > Hi, > > >>>>> > > > >>>>> > Thanks for the answers. > > >>>>> > > > >>>>> >> we may still need to discuss whether the > > >>>>> >> overdraft/reserve/spare should use extra buffers or buffers > > >>>>> >> in (exclusive + floating buffers)? > > >>>>> > and > > >>>>> > > > >>>>> >> These things resolve the different problems (at least as I see > > >>>>> that). > > >>>>> >> The current hardcoded "1" says that we switch "availability" to > > >>>>> >> "unavailability" when one more buffer is left(actually a little > > less > > >>>>> >> than one buffer since we write the last piece of data to this > last > > >>>>> >> buffer). The overdraft feature doesn't change this logic we > still > > >>>>> want > > >>>>> >> to switch to "unavailability" in such a way but if we are > already > > in > > >>>>> >> "unavailability" and we want more buffers then we can take > > >>>>> "overdraft > > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we > need > > >>>>> to > > >>>>> >> understand when we should switch to "unavailability" > > >>>>> > Ok, I see. So it seems to me that both of you have in mind to > keep > > >>>>> the > > >>>>> > buffer pools as they are right now, but if we are in the middle > of > > >>>>> > processing a record, we can request extra overdraft buffers on > top > > of > > >>>>> > those? This is another way to implement the overdraft to what I > was > > >>>>> > thinking. I was thinking about something like keeping the > > >>>>> "overdraft" or > > >>>>> > more precisely buffer "reserve" in the buffer pool. I think my > > >>>>> version > > >>>>> > would be easier to implement, because it is just fiddling with > > >>>>> min/max > > >>>>> > buffers calculation and slightly modified `checkAvailability()` > > >>>>> logic. > > >>>>> > > > >>>>> > On the other hand what you have in mind would better utilise the > > >>>>> available > > >>>>> > memory, right? It would require more code changes (how would we > > know > > >>>>> when > > >>>>> > we are allowed to request the overdraft?). However, in this > case, I > > >>>>> would > > >>>>> > be tempted to set the number of overdraft buffers by default to > > >>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers > as > > >>>>> > necessary. The only downside that I can think of (apart of higher > > >>>>> > complexity) would be higher chance of hitting a known/unsolved > > >>>>> deadlock [1] > > >>>>> > in a scenario: > > >>>>> > - downstream task hasn't yet started > > >>>>> > - upstream task requests overdraft and uses all available memory > > >>>>> segments > > >>>>> > from the global pool > > >>>>> > - upstream task is blocked, because downstream task hasn't > started > > >>>>> yet and > > >>>>> > can not consume any data > > >>>>> > - downstream task tries to start, but can not, as there are no > > >>>>> available > > >>>>> > buffers > > >>>>> > > > >>>>> >> BTW, for watermark, the number of buffers it needs is > > >>>>> >> numberOfSubpartitions. So if > > overdraftBuffers=numberOfSubpartitions, > > >>>>> >> the watermark won't block in requestMemory. > > >>>>> > and > > >>>>> > > > >>>>> >> the best overdraft size will be equal to parallelism. > > >>>>> > That's a lot of buffers. I don't think we need that many for > > >>>>> broadcasting > > >>>>> > watermarks. Watermarks are small, and remember that every > > >>>>> subpartition has > > >>>>> > some partially filled/empty WIP buffer, so the vast majority of > > >>>>> > subpartitions will not need to request a new buffer. > > >>>>> > > > >>>>> > Best, > > >>>>> > Piotrek > > >>>>> > > > >>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203 > > >>>>> > > > >>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> > > >>>>> napisał(a): > > >>>>> > > > >>>>> >> Hi, > > >>>>> >> > > >>>>> >> > > >>>>> >> >> Do you mean to ignore it while processing records, but keep > > >>>>> using > > >>>>> >> `maxBuffersPerChannel` when calculating the availability of the > > >>>>> output? > > >>>>> >> > > >>>>> >> > > >>>>> >> Yes, it is correct. > > >>>>> >> > > >>>>> >> > > >>>>> >> >> Would it be a big issue if we changed it to check if at > least > > >>>>> >> "overdraft number of buffers are available", where "overdraft > > >>>>> number" is > > >>>>> >> configurable, instead of the currently hardcoded value of "1"? > > >>>>> >> > > >>>>> >> > > >>>>> >> These things resolve the different problems (at least as I see > > >>>>> that). > > >>>>> >> The current hardcoded "1" says that we switch "availability" to > > >>>>> >> "unavailability" when one more buffer is left(actually a little > > less > > >>>>> >> than one buffer since we write the last piece of data to this > last > > >>>>> >> buffer). The overdraft feature doesn't change this logic we > still > > >>>>> want > > >>>>> >> to switch to "unavailability" in such a way but if we are > already > > in > > >>>>> >> "unavailability" and we want more buffers then we can take > > >>>>> "overdraft > > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we > need > > >>>>> to > > >>>>> >> understand when we should switch to "unavailability" > > >>>>> >> > > >>>>> >> > > >>>>> >> -- About "reserve" vs "overdraft" > > >>>>> >> > > >>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size will > > be > > >>>>> >> equal to parallelism. Also, the user can set any value he wants. > > So > > >>>>> even > > >>>>> >> if parallelism is small(~5) but the user's flatmap produces a > lot > > of > > >>>>> >> data, the user can set 10 or even more. Which almost double the > > max > > >>>>> >> buffers and it will be impossible to reserve. At least we need > to > > >>>>> figure > > >>>>> >> out how to protect from such cases (the limit for an > overdraft?). > > So > > >>>>> >> actually it looks even more difficult than increasing the > maximum > > >>>>> buffers. > > >>>>> >> > > >>>>> >> I want to emphasize that overdraft buffers are soft > configuration > > >>>>> which > > >>>>> >> means it takes as many buffers as the global buffers pool has > > >>>>> >> available(maybe zero) but less than this configured value. It is > > >>>>> also > > >>>>> >> important to notice that perhaps, not many subtasks in > TaskManager > > >>>>> will > > >>>>> >> be using this feature so we don't actually need a lot of > available > > >>>>> >> buffers for every subtask(Here, I mean that if we have only one > > >>>>> >> window/flatmap operator and many other operators, then one > > >>>>> TaskManager > > >>>>> >> will have many ordinary subtasks which don't actually need > > >>>>> overdraft and > > >>>>> >> several subtasks that needs this feature). But in case of > > >>>>> reservation, > > >>>>> >> we will reserve some buffers for all operators even if they > don't > > >>>>> really > > >>>>> >> need it. > > >>>>> >> > > >>>>> >> > > >>>>> >> -- Legacy source problem > > >>>>> >> > > >>>>> >> If we still want to change max buffers then it is problem for > > >>>>> >> LegacySources(since every subtask of source will always use > these > > >>>>> >> overdraft). But right now, I think that we can force to set 0 > > >>>>> overdraft > > >>>>> >> buffers for legacy subtasks in configuration during execution(if > > it > > >>>>> is > > >>>>> >> not too late for changing configuration in this place). > > >>>>> >> > > >>>>> >> > > >>>>> >> 03.05.2022 14:11, rui fan пишет: > > >>>>> >>> Hi > > >>>>> >>> > > >>>>> >>> Thanks for Martijn Visser and Piotrek's feedback. I agree with > > >>>>> >>> ignoring the legacy source, it will affect our design. User > > should > > >>>>> >>> use the new Source Api as much as possible. > > >>>>> >>> > > >>>>> >>> Hi Piotrek, we may still need to discuss whether the > > >>>>> >>> overdraft/reserve/spare should use extra buffers or buffers > > >>>>> >>> in (exclusive + floating buffers)? They have some differences. > > >>>>> >>> > > >>>>> >>> If it uses extra buffers: > > >>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1 > > >>>>> >>> <= currentPoolSize) and all subpartitions don't reach the > > >>>>> >>> maxBuffersPerChannel. > > >>>>> >>> > > >>>>> >>> If it uses the buffers in (exclusive + floating buffers): > > >>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers + > > >>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions > > >>>>> >>> don't reach the maxBuffersPerChannel. > > >>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), > > the > > >>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be > > >>>>> >>> easily unavailable. For throughput, if users turn up the > > >>>>> >>> overdraft buffers, they need to turn up exclusive or floating > > >>>>> >>> buffers. It also affects the InputChannel, and it's is > unfriendly > > >>>>> >>> to users. > > >>>>> >>> > > >>>>> >>> So I prefer the overdraft to use extra buffers. > > >>>>> >>> > > >>>>> >>> > > >>>>> >>> BTW, for watermark, the number of buffers it needs is > > >>>>> >>> numberOfSubpartitions. So if > > >>>>> overdraftBuffers=numberOfSubpartitions, > > >>>>> >>> the watermark won't block in requestMemory. But it has > > >>>>> >>> 2 problems: > > >>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses > > >>>>> >>> (exclusive + floating buffers), there will be fewer buffers > > >>>>> >>> available. Throughput may be affected. > > >>>>> >>> 2. The numberOfSubpartitions is different for each Task. > > >>>>> >>> So if users want to cover watermark using this feature, > > >>>>> >>> they don't know how to set the overdraftBuffers more r > > >>>>> >>> easonably. And if the parallelism is changed, users still > > >>>>> >>> need to change overdraftBuffers. It is unfriendly to users. > > >>>>> >>> > > >>>>> >>> So I propose we support overdraftBuffers=-1, It means > > >>>>> >>> we will automatically set > overdraftBuffers=numberOfSubpartitions > > >>>>> >>> in the Constructor of LocalBufferPool. > > >>>>> >>> > > >>>>> >>> Please correct me if I'm wrong. > > >>>>> >>> > > >>>>> >>> Thanks > > >>>>> >>> fanrui > > >>>>> >>> > > >>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski < > > >>>>> pnowoj...@apache.org> > > >>>>> >> wrote: > > >>>>> >>>> Hi fanrui, > > >>>>> >>>> > > >>>>> >>>>> Do you mean don't add the extra buffers? We just use > (exclusive > > >>>>> >> buffers * > > >>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be > > >>>>> available > > >>>>> >>>> when > > >>>>> >>>>> (usedBuffers+overdraftBuffers <= > > >>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers) > > >>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, > > >>>>> right? > > >>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum > > >>>>> number of > > >>>>> >> the > > >>>>> >>>> required buffers, just as we did when we were implementing the > > non > > >>>>> >> blocking > > >>>>> >>>> outputs and adding availability logic to LocalBufferPool. Back > > >>>>> then we > > >>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this > > >>>>> logic is > > >>>>> >>>> located > > >>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition: > > >>>>> >>>> > > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : > > >>>>> numSubpartitions + 1; > > >>>>> >>>> For performance reasons, we always require at least one buffer > > per > > >>>>> >>>> sub-partition. Otherwise performance falls drastically. Now if > > we > > >>>>> >> require 5 > > >>>>> >>>> overdraft buffers for output to be available, we need to have > > >>>>> them on > > >>>>> >> top > > >>>>> >>>> of those "one buffer per sub-partition". So the logic should > be > > >>>>> changed > > >>>>> >> to: > > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : > > >>>>> numSubpartitions + > > >>>>> >>>> numOverdraftBuffers; > > >>>>> >>>> > > >>>>> >>>> Regarding increasing the number of max buffers I'm not sure. > As > > >>>>> long as > > >>>>> >>>> "overdraft << max number of buffers", because all buffers on > the > > >>>>> outputs > > >>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft > > >>>>> buffers, > > >>>>> >> and > > >>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of > > >>>>> things if > > >>>>> >> we > > >>>>> >>>> make the output available if at least one single buffer is > > >>>>> available or > > >>>>> >> at > > >>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So > > >>>>> effects of > > >>>>> >>>> increasing the overdraft from 1 to for example 5 should be > > >>>>> negligible. > > >>>>> >> For > > >>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 > > still > > >>>>> >> increases > > >>>>> >>>> the overdraft by only about 25%. So maybe we can keep the max > as > > >>>>> it is? > > >>>>> >>>> > > >>>>> >>>> If so, maybe we should change the name from "overdraft" to > > "buffer > > >>>>> >> reserve" > > >>>>> >>>> or "spare buffers"? And document it as "number of buffers kept > > in > > >>>>> >> reserve > > >>>>> >>>> in case of flatMap/firing timers/huge records"? > > >>>>> >>>> > > >>>>> >>>> What do you think Fenrui, Anton? > > >>>>> >>>> > > >>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the > new > > >>>>> >> features, > > >>>>> >>>> as long as we don't brake the existing deployments too much. > > >>>>> >>>> > > >>>>> >>>> Best, > > >>>>> >>>> Piotrek > > >>>>> >>>> > > >>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com > > > > >>>>> >> napisał(a): > > >>>>> >>>>> Hi everyone, > > >>>>> >>>>> > > >>>>> >>>>> Just wanted to chip in on the discussion of legacy sources: > > >>>>> IMHO, we > > >>>>> >>>> should > > >>>>> >>>>> not focus too much on improving/adding capabilities for > legacy > > >>>>> sources. > > >>>>> >>>> We > > >>>>> >>>>> want to persuade and push users to use the new Source API. > Yes, > > >>>>> this > > >>>>> >>>> means > > >>>>> >>>>> that there's work required by the end users to port any > custom > > >>>>> source > > >>>>> >> to > > >>>>> >>>>> the new interface. The benefits of the new Source API should > > >>>>> outweigh > > >>>>> >>>> this. > > >>>>> >>>>> Anything that we build to support multiple interfaces means > > >>>>> adding more > > >>>>> >>>>> complexity and more possibilities for bugs. Let's try to make > > our > > >>>>> >> lives a > > >>>>> >>>>> little bit easier. > > >>>>> >>>>> > > >>>>> >>>>> Best regards, > > >>>>> >>>>> > > >>>>> >>>>> Martijn Visser > > >>>>> >>>>> https://twitter.com/MartijnVisser82 > > >>>>> >>>>> https://github.com/MartijnVisser > > >>>>> >>>>> > > >>>>> >>>>> > > >>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> > > >>>>> wrote: > > >>>>> >>>>> > > >>>>> >>>>>> Hi Piotrek > > >>>>> >>>>>> > > >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep > > >>>>> using > > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of > > the > > >>>>> >>>> output? > > >>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double > > check. > > >>>>> >>>>>> > > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it > a > > >>>>> big > > >>>>> >>>>> problem > > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we > > already > > >>>>> have > > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer. > > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a > > single > > >>>>> >>>> buffer > > >>>>> >>>>>>> available and this works the same for all tasks (including > > >>>>> legacy > > >>>>> >>>>> source > > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check > if > > >>>>> at least > > >>>>> >>>>>>> "overdraft number of buffers are available", where > "overdraft > > >>>>> number" > > >>>>> >>>>> is > > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of > > "1"? > > >>>>> >>>>>> Do you mean don't add the extra buffers? We just use > > (exclusive > > >>>>> >>>> buffers * > > >>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be > > >>>>> available > > >>>>> >>>>> when > > >>>>> >>>>>> (usedBuffers+overdraftBuffers <= > > >>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers) > > >>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, > > >>>>> right? > > >>>>> >>>>>> > > >>>>> >>>>>> If yes, I think it can solve the problem of legacy source. > > >>>>> There may > > >>>>> >> be > > >>>>> >>>>>> some impact. If overdraftBuffers is large and only one > buffer > > >>>>> is used > > >>>>> >>>> to > > >>>>> >>>>>> process a single record, exclusive buffers*parallelism + > > >>>>> floating > > >>>>> >>>> buffers > > >>>>> >>>>>> cannot be used. It may only be possible to use (exclusive > > >>>>> buffers * > > >>>>> >>>>>> parallelism > > >>>>> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, > > if > > >>>>> turn > > >>>>> >> up > > >>>>> >>>>> the > > >>>>> >>>>>> overdraft buffers, the flink user needs to turn up exclusive > > or > > >>>>> >>>> floating > > >>>>> >>>>>> buffers. And it also affects the InputChannel. > > >>>>> >>>>>> > > >>>>> >>>>>> If not, I don't think it can solve the problem of legacy > > >>>>> source. The > > >>>>> >>>>> legacy > > >>>>> >>>>>> source don't check isAvailable, If there are the extra > > buffers, > > >>>>> legacy > > >>>>> >>>>>> source > > >>>>> >>>>>> will use them up until block in requestMemory. > > >>>>> >>>>>> > > >>>>> >>>>>> > > >>>>> >>>>>> Thanks > > >>>>> >>>>>> fanrui > > >>>>> >>>>>> > > >>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski < > > >>>>> pnowoj...@apache.org> > > >>>>> >>>>>> wrote: > > >>>>> >>>>>> > > >>>>> >>>>>>> Hi, > > >>>>> >>>>>>> > > >>>>> >>>>>>> +1 for the general proposal from my side. It would be a > nice > > >>>>> >>>> workaround > > >>>>> >>>>>>> flatMaps, WindowOperators and large records issues with > > >>>>> unaligned > > >>>>> >>>>>>> checkpoints. > > >>>>> >>>>>>> > > >>>>> >>>>>>>> The first task is about ignoring max buffers per channel. > > This > > >>>>> >>>> means > > >>>>> >>>>> if > > >>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the > > >>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just > > >>>>> ignore > > >>>>> >>>> that > > >>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has > > >>>>> it(it is > > >>>>> >>>>>>>> actually not a overdraft). > > >>>>> >>>>>>> Do you mean to ignore it while processing records, but keep > > >>>>> using > > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability of > > the > > >>>>> >>>> output? > > >>>>> >>>>>>>> The second task is about the real overdraft. I am pretty > > >>>>> convinced > > >>>>> >>>>> now > > >>>>> >>>>>>>> that we, unfortunately, need configuration for limitation > of > > >>>>> >>>>> overdraft > > >>>>> >>>>>>>> number(because it is not ok if one subtask allocates all > > >>>>> buffers of > > >>>>> >>>>> one > > >>>>> >>>>>>>> TaskManager considering that several different jobs can be > > >>>>> >>>> submitted > > >>>>> >>>>> on > > >>>>> >>>>>>>> this TaskManager). So idea is to have > > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per > > >>>>> >>>>>> LocalBufferPool). > > >>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool > is > > >>>>> >>>> reached, > > >>>>> >>>>>>>> LocalBufferPool can request additionally from > > >>>>> NetworkBufferPool up > > >>>>> >>>> to > > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers. > > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is it > a > > >>>>> big > > >>>>> >>>>> problem > > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we > > already > > >>>>> have > > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer. > > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a > > single > > >>>>> >>>> buffer > > >>>>> >>>>>>> available and this works the same for all tasks (including > > >>>>> legacy > > >>>>> >>>>> source > > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check > if > > >>>>> at least > > >>>>> >>>>>>> "overdraft number of buffers are available", where > "overdraft > > >>>>> number" > > >>>>> >>>>> is > > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of > > "1"? > > >>>>> >>>>>>> > > >>>>> >>>>>>> Best, > > >>>>> >>>>>>> Piotrek > > >>>>> >>>>>>> > > >>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> > > >>>>> napisał(a): > > >>>>> >>>>>>> > > >>>>> >>>>>>>> Let me add some information about the LegacySource. > > >>>>> >>>>>>>> > > >>>>> >>>>>>>> If we want to disable the overdraft buffer for > LegacySource. > > >>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool? > > >>>>> >>>>>>>> The default value is false. If the getAvailableFuture is > > >>>>> called, > > >>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there > are > > >>>>> >>>>>>>> checks isAvailable elsewhere. > > >>>>> >>>>>>>> > > >>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please correct > > me > > >>>>> if > > >>>>> >>>> I'm > > >>>>> >>>>>>> wrong. > > >>>>> >>>>>>>> Thanks > > >>>>> >>>>>>>> fanrui > > >>>>> >>>>>>>> > > >>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan < > > >>>>> 1996fan...@gmail.com> > > >>>>> >>>>> wrote: > > >>>>> >>>>>>>>> Hi, > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>> Thanks for your quick response. > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just need > > to > > >>>>> >>>>> discuss > > >>>>> >>>>>>> the > > >>>>> >>>>>>>>> default value in PR. > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for > > >>>>> general > > >>>>> >>>>>>>>> implementation. > > >>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() > in > > >>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common > > >>>>> >>>>> LegacySource, > > >>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs > consume > > >>>>> >>>> kafka, > > >>>>> >>>>> so > > >>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems. > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>> Core code: > > >>>>> >>>>>>>>> ``` > > >>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() { > > >>>>> >>>>>>>>> if (recordWriter == null > > >>>>> >>>>>>>>> || > > >>>>> >>>>>>>>> > > >>>>> >> > > >>>>> > > !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, > > >>>>> >>>>>>>>> false) > > >>>>> >>>>>>>>> || recordWriter.isAvailable()) { > > >>>>> >>>>>>>>> return; > > >>>>> >>>>>>>>> } > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>> CompletableFuture<?> resumeFuture = > > >>>>> >>>>>>>> recordWriter.getAvailableFuture(); > > >>>>> >>>>>>>>> try { > > >>>>> >>>>>>>>> resumeFuture.get(); > > >>>>> >>>>>>>>> } catch (Throwable ignored) { > > >>>>> >>>>>>>>> } > > >>>>> >>>>>>>>> } > > >>>>> >>>>>>>>> ``` > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>> LegacySource calls > > >>>>> sourceContext.ensureRecordWriterIsAvailable() > > >>>>> >>>>>>>>> before synchronized (checkpointLock) and collects > records. > > >>>>> >>>>>>>>> Please let me know if there is a better solution. > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>> Thanks > > >>>>> >>>>>>>>> fanrui > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov < > > >>>>> >>>>>> kaa....@yandex.com> > > >>>>> >>>>>>>>> wrote: > > >>>>> >>>>>>>>> > > >>>>> >>>>>>>>>> Hi. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs > or > > >>>>> two > > >>>>> >>>>>> commits > > >>>>> >>>>>>>> in a > > >>>>> >>>>>>>>>> PR? > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this > > >>>>> task has > > >>>>> >>>>>> fewer > > >>>>> >>>>>>>>>> questions but we should find a solution for LegacySource > > >>>>> first. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> -- 2. For the first task, if the flink user disables > the > > >>>>> >>>>> Unaligned > > >>>>> >>>>>>>>>> Checkpoint, do we ignore max buffers per channel? > > >>>>> Because > > >>>>> >>>> the > > >>>>> >>>>>>>>>> overdraft > > >>>>> >>>>>>>>>> isn't useful for the Aligned Checkpoint, it still > > >>>>> needs to > > >>>>> >>>>> wait > > >>>>> >>>>>>> for > > >>>>> >>>>>>>>>> downstream Task to consume. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> I think that the logic should be the same for AC and UC. > > As > > >>>>> I > > >>>>> >>>>>>>> understand, > > >>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it > > >>>>> doesn't > > >>>>> >>>>> make > > >>>>> >>>>>>> it > > >>>>> >>>>>>>>>> worse as well. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> 3. For the second task > > >>>>> >>>>>>>>>> -- - The default value of > > >>>>> maxOverdraftBuffersPerPartition > > >>>>> >>>> may > > >>>>> >>>>>>> also > > >>>>> >>>>>>>>>> need > > >>>>> >>>>>>>>>> to be discussed. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> I think it should be a pretty small value or even 0 > since > > it > > >>>>> >>>> kind > > >>>>> >>>>> of > > >>>>> >>>>>>>>>> optimization and user should understand what they > > >>>>> do(especially > > >>>>> >>>> if > > >>>>> >>>>>> we > > >>>>> >>>>>>>>>> implement the first task). > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> -- - If the user disables the Unaligned Checkpoint, > > >>>>> can we > > >>>>> >>>>> set > > >>>>> >>>>>>> the > > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the > > >>>>> overdraft > > >>>>> >>>>>> isn't > > >>>>> >>>>>>>>>> useful for > > >>>>> >>>>>>>>>> the Aligned Checkpoint. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't > make > > >>>>> >>>>>> degradation > > >>>>> >>>>>>>> for > > >>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make > > >>>>> >>>>> difference > > >>>>> >>>>>>>> between > > >>>>> >>>>>>>>>> AC and UC. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> 4. For the legacy source > > >>>>> >>>>>>>>>> -- - If enabling the Unaligned Checkpoint, it uses > up > > >>>>> to > > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers. > > >>>>> >>>>>>>>>> - If disabling the UC, it doesn't use the > > overdraft > > >>>>> >>>> buffer. > > >>>>> >>>>>>>>>> - Do you think it's ok? > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource > at > > >>>>> all > > >>>>> >>>>> since > > >>>>> >>>>>>> it > > >>>>> >>>>>>>>>> can lead to undesirable results especially if the limit > is > > >>>>> high. > > >>>>> >>>>> At > > >>>>> >>>>>>>> least, > > >>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode > and > > >>>>> it > > >>>>> >>>> will > > >>>>> >>>>>>>> borrow > > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global > > pool > > >>>>> >>>> which > > >>>>> >>>>>> can > > >>>>> >>>>>>>> lead > > >>>>> >>>>>>>>>> to degradation of other subtasks on the same > TaskManager. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> -- - Actually, we added the checkAvailable logic > for > > >>>>> >>>>>> LegacySource > > >>>>> >>>>>>>> in > > >>>>> >>>>>>>>>> our > > >>>>> >>>>>>>>>> internal version. It works well. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> I don't really understand how it is possible for general > > >>>>> case > > >>>>> >>>>>>>> considering > > >>>>> >>>>>>>>>> that each user has their own implementation of > > >>>>> >>>>> LegacySourceOperator > > >>>>> >>>>>>>>>> -- 5. For the benchmark, do you have any suggestions? > I > > >>>>> >>>>> submitted > > >>>>> >>>>>>> the > > >>>>> >>>>>>>> PR > > >>>>> >>>>>>>>>> [1]. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon. > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет: > > >>>>> >>>>>>>>>>> Hi, > > >>>>> >>>>>>>>>>> > > >>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions. > > >>>>> >>>>>>>>>>> > > >>>>> >>>>>>>>>>> 1. Do you mean split this into two JIRAs or two > PRs > > >>>>> or two > > >>>>> >>>>>>> commits > > >>>>> >>>>>>>>>> in a > > >>>>> >>>>>>>>>>> PR? > > >>>>> >>>>>>>>>>> 2. For the first task, if the flink user disables > > the > > >>>>> >>>>>> Unaligned > > >>>>> >>>>>>>>>>> Checkpoint, do we ignore max buffers per channel? > > >>>>> Because > > >>>>> >>>>> the > > >>>>> >>>>>>>>>> overdraft > > >>>>> >>>>>>>>>>> isn't useful for the Aligned Checkpoint, it still > > >>>>> needs to > > >>>>> >>>>>> wait > > >>>>> >>>>>>>> for > > >>>>> >>>>>>>>>>> downstream Task to consume. > > >>>>> >>>>>>>>>>> 3. For the second task > > >>>>> >>>>>>>>>>> - The default value of > > >>>>> maxOverdraftBuffersPerPartition > > >>>>> >>>>> may > > >>>>> >>>>>>> also > > >>>>> >>>>>>>>>> need > > >>>>> >>>>>>>>>>> to be discussed. > > >>>>> >>>>>>>>>>> - If the user disables the Unaligned > Checkpoint, > > >>>>> can we > > >>>>> >>>>> set > > >>>>> >>>>>>> the > > >>>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the > > >>>>> >>>> overdraft > > >>>>> >>>>>>> isn't > > >>>>> >>>>>>>>>> useful for > > >>>>> >>>>>>>>>>> the Aligned Checkpoint. > > >>>>> >>>>>>>>>>> 4. For the legacy source > > >>>>> >>>>>>>>>>> - If enabling the Unaligned Checkpoint, it > uses > > >>>>> up to > > >>>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. > > >>>>> >>>>>>>>>>> - If disabling the UC, it doesn't use the > > >>>>> overdraft > > >>>>> >>>>> buffer. > > >>>>> >>>>>>>>>>> - Do you think it's ok? > > >>>>> >>>>>>>>>>> - Actually, we added the checkAvailable logic > > for > > >>>>> >>>>>>> LegacySource > > >>>>> >>>>>>>>>> in our > > >>>>> >>>>>>>>>>> internal version. It works well. > > >>>>> >>>>>>>>>>> 5. For the benchmark, do you have any > suggestions? > > I > > >>>>> >>>>> submitted > > >>>>> >>>>>>> the > > >>>>> >>>>>>>>>> PR > > >>>>> >>>>>>>>>>> [1]. > > >>>>> >>>>>>>>>>> > > >>>>> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54 > > >>>>> >>>>>>>>>>> > > >>>>> >>>>>>>>>>> Thanks > > >>>>> >>>>>>>>>>> fanrui > > >>>>> >>>>>>>>>>> > > >>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov < > > >>>>> >>>>>>> kaa....@yandex.com > > >>>>> >>>>>>>>>>> wrote: > > >>>>> >>>>>>>>>>> > > >>>>> >>>>>>>>>>>> Hi, > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. > Here > > >>>>> is > > >>>>> >>>>> some > > >>>>> >>>>>>>>>>>> conclusion: > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> First of all, let's split this into two tasks. > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per > > channel. > > >>>>> >>>> This > > >>>>> >>>>>>> means > > >>>>> >>>>>>>> if > > >>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and > the > > >>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we > > just > > >>>>> >>>>> ignore > > >>>>> >>>>>>> that > > >>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool > > has > > >>>>> >>>> it(it > > >>>>> >>>>>> is > > >>>>> >>>>>>>>>>>> actually not a overdraft). > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am > pretty > > >>>>> >>>>>> convinced > > >>>>> >>>>>>>> now > > >>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for > > limitation > > >>>>> of > > >>>>> >>>>>>>> overdraft > > >>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates > all > > >>>>> >>>>> buffers > > >>>>> >>>>>> of > > >>>>> >>>>>>>> one > > >>>>> >>>>>>>>>>>> TaskManager considering that several different jobs > can > > be > > >>>>> >>>>>>> submitted > > >>>>> >>>>>>>> on > > >>>>> >>>>>>>>>>>> this TaskManager). So idea is to have > > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per > > >>>>> >>>>>>>>>> LocalBufferPool). > > >>>>> >>>>>>>>>>>> In this case, when a limit of buffers in > LocalBufferPool > > >>>>> is > > >>>>> >>>>>>> reached, > > >>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from > > >>>>> >>>> NetworkBufferPool > > >>>>> >>>>>> up > > >>>>> >>>>>>> to > > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource > > >>>>> since it > > >>>>> >>>>>>>> actually > > >>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in > > >>>>> >>>> overdraft > > >>>>> >>>>>>> mode > > >>>>> >>>>>>>>>>>> which is not a target. So we still need to think about > > >>>>> that. > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет: > > >>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov, > > >>>>> >>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum > > >>>>> number of > > >>>>> >>>>>>>> overdraft > > >>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, > > right? > > >>>>> >>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to > > >>>>> don't > > >>>>> >>>> add > > >>>>> >>>>>> the > > >>>>> >>>>>>>> new > > >>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the > > >>>>> community. > > >>>>> >>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>> Best wishes > > >>>>> >>>>>>>>>>>>> fanrui > > >>>>> >>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan < > > >>>>> >>>>> 1996fan...@gmail.com> > > >>>>> >>>>>>>>>> wrote: > > >>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov, > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are > > >>>>> totally > > >>>>> >>>>>> right. > > >>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be > > used > > >>>>> as > > >>>>> >>>>> the > > >>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer > > >>>>> >>>>>>> configuration.Flink > > >>>>> >>>>>>>>>> users > > >>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the > > >>>>> overdraft > > >>>>> >>>>>> buffer > > >>>>> >>>>>>>>>> size. > > >>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, > we > > >>>>> >>>> should > > >>>>> >>>>>>> limit > > >>>>> >>>>>>>>>> the > > >>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each > > >>>>> >>>>> LocalBufferPool > > >>>>> >>>>>>>>>>>>>> can apply for. > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>> Why do we limit it? > > >>>>> >>>>>>>>>>>>>> Some operators don't check the > > >>>>> `recordWriter.isAvailable` > > >>>>> >>>>>> during > > >>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have > > >>>>> mentioned > > >>>>> >>>> it > > >>>>> >>>>>> in > > >>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other > > cases. > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will > use > > >>>>> up > > >>>>> >>>> all > > >>>>> >>>>>>>>>> remaining > > >>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the > backpressure > > is > > >>>>> >>>>>> severe. > > >>>>> >>>>>>>>>>>>>> How to limit it? > > >>>>> >>>>>>>>>>>>>> I prefer to hard code the > > >>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions` > > >>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The > > >>>>> >>>>> maxOverdraftBuffers > > >>>>> >>>>>> is > > >>>>> >>>>>>>>>> just > > >>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink > > >>>>> jobs. Or > > >>>>> >>>>> we > > >>>>> >>>>>>> can > > >>>>> >>>>>>>>>> set > > >>>>> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, > > >>>>> 10)` > > >>>>> >>>> to > > >>>>> >>>>>>> handle > > >>>>> >>>>>>>>>>>>>> some jobs of low parallelism. > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, > we > > >>>>> can > > >>>>> >>>>> set > > >>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of > > >>>>> >>>> LocalBufferPool. > > >>>>> >>>>>>>> Because > > >>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned > Checkpoint. > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot. > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>> [1] > https://issues.apache.org/jira/browse/FLINK-26759 > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>> Best wishes > > >>>>> >>>>>>>>>>>>>> fanrui > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov < > > >>>>> >>>>>>>>>> kaa....@yandex.com> > > >>>>> >>>>>>>>>>>>>> wrote: > > >>>>> >>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> Hi fanrui, > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP. > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and > it > > >>>>> >>>> should > > >>>>> >>>>>>> help > > >>>>> >>>>>>>> in > > >>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about > > >>>>> >>>>>> configuration: > > >>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I > understand > > >>>>> right > > >>>>> >>>>> now > > >>>>> >>>>>>> we > > >>>>> >>>>>>>>>> have > > >>>>> >>>>>>>>>>>>>>> following calculation. > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network > > >>>>> >>>>> memory(calculated > > >>>>> >>>>>>> via > > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction, > > >>>>> >>>>>>>> taskmanager.memory.network.min, > > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory > > size) / > > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size. > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive > > >>>>> >>>> buffers > > >>>>> >>>>> * > > >>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number > in > > >>>>> >>>>>> TaskManager > > >>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which > > used > > >>>>> at > > >>>>> >>>>>>> current > > >>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber) > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to > > >>>>> >>>>>>> maxBuffersNumber > > >>>>> >>>>>>>>>> which > > >>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if > > >>>>> >>>> requiredBuffersNumber > > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not > > >>>>> good) > > >>>>> >>>>> since > > >>>>> >>>>>>> not > > >>>>> >>>>>>>>>> all > > >>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if > > >>>>> Flink > > >>>>> >>>> can > > >>>>> >>>>>> not > > >>>>> >>>>>>>>>>>>>>> allocate floating buffers) > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, > as I > > >>>>> >>>>>> understand > > >>>>> >>>>>>>>>> Flink > > >>>>> >>>>>>>>>>>>>>> just never use these leftovers > > >>>>> buffers(maxBuffersNumber - > > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( > we > > >>>>> can > > >>>>> >>>>>> actualy > > >>>>> >>>>>>>> use > > >>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber - > > >>>>> >>>> buffersInUseNumber' > > >>>>> >>>>>>> since > > >>>>> >>>>>>>>>> if > > >>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators > including > > >>>>> >>>>> 'window' > > >>>>> >>>>>>>> which > > >>>>> >>>>>>>>>> can > > >>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool). > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to > > >>>>> >>>> requesting > > >>>>> >>>>>>>> buffers > > >>>>> >>>>>>>>>>>>>>> during processing single record while switching to > > >>>>> >>>>>> unavalability > > >>>>> >>>>>>>>>>>> between > > >>>>> >>>>>>>>>>>>>>> records should be the same as we have it now): > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> * If one more buffer requested but > > maxBuffersPerChannel > > >>>>> >>>>>> reached, > > >>>>> >>>>>>>>>> then > > >>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this > buffers > > >>>>> from > > >>>>> >>>>> any > > >>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet > > >>>>> >>>> otherwise > > >>>>> >>>>>>> from > > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool) > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally > > >>>>> >>>> allocate > > >>>>> >>>>>> it > > >>>>> >>>>>>>> from > > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to > allocate > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't > > work, > > >>>>> >>>> but I > > >>>>> >>>>>>> like > > >>>>> >>>>>>>> it > > >>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch > > without > > >>>>> >>>> any > > >>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be > > >>>>> configuration > > >>>>> >>>> by > > >>>>> >>>>>>>>>> changing > > >>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and > > >>>>> requiredBuffersNumber. > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really > > want > > >>>>> to > > >>>>> >>>>>>>> implement > > >>>>> >>>>>>>>>> new > > >>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how to > > >>>>> >>>>> correctly > > >>>>> >>>>>>>>>> configure > > >>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I > > don't > > >>>>> >>>> want > > >>>>> >>>>>> to > > >>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to > > >>>>> >>>> resolve > > >>>>> >>>>>> the > > >>>>> >>>>>>>>>> problem > > >>>>> >>>>>>>>>>>>>>> automatically(as described above). > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers > > >>>>> >>>> correct? > > >>>>> >>>>>>>>>>>>>>> -- > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> Best regards, > > >>>>> >>>>>>>>>>>>>>> Anton Kalashnikov > > >>>>> >>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет: > > >>>>> >>>>>>>>>>>>>>>> Hi everyone, > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major > > feature > > >>>>> of > > >>>>> >>>>>> Flink. > > >>>>> >>>>>>>> It > > >>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint > timeout > > >>>>> or > > >>>>> >>>>> slow > > >>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe. > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not > work > > >>>>> well > > >>>>> >>>>>> when > > >>>>> >>>>>>>> the > > >>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output > buffers > > >>>>> are > > >>>>> >>>>>>> required > > >>>>> >>>>>>>> to > > >>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also > > >>>>> mentioned > > >>>>> >>>>> this > > >>>>> >>>>>>>> issue > > >>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to > solve > > >>>>> it. > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail > > the > > >>>>> >>>>>>> overdraft > > >>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton > > >>>>> >>>> Kalashnikov, > > >>>>> >>>>>>> there > > >>>>> >>>>>>>>>> are > > >>>>> >>>>>>>>>>>>>>>> still some points to discuss: > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>>> * There are already a lot of buffer-related > > >>>>> >>>>>> configurations. > > >>>>> >>>>>>>> Do > > >>>>> >>>>>>>>>> we > > >>>>> >>>>>>>>>>>>>>>> need to add a new configuration for the > > >>>>> overdraft > > >>>>> >>>>>> buffer? > > >>>>> >>>>>>>>>>>>>>>> * Where should the overdraft buffer use > > memory? > > >>>>> >>>>>>>>>>>>>>>> * If the overdraft-buffer uses the memory > > >>>>> remaining > > >>>>> >>>> in > > >>>>> >>>>>> the > > >>>>> >>>>>>>>>>>>>>>> NetworkBufferPool, no new configuration > > needs > > >>>>> to be > > >>>>> >>>>>>> added. > > >>>>> >>>>>>>>>>>>>>>> * If adding a new configuration: > > >>>>> >>>>>>>>>>>>>>>> o Should we set the > overdraft-memory-size > > >>>>> at the > > >>>>> >>>> TM > > >>>>> >>>>>>> level > > >>>>> >>>>>>>>>> or > > >>>>> >>>>>>>>>>>> the > > >>>>> >>>>>>>>>>>>>>>> Task level? > > >>>>> >>>>>>>>>>>>>>>> o Or set overdraft-buffers to indicate > the > > >>>>> number > > >>>>> >>>>> of > > >>>>> >>>>>>>>>>>>>>>> memory-segments that can be overdrawn. > > >>>>> >>>>>>>>>>>>>>>> o What is the default value? How to set > > >>>>> sensible > > >>>>> >>>>>>>> defaults? > > >>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it > > >>>>> using > > >>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets > overdraft-buffers > > >>>>> at > > >>>>> >>>>> Task > > >>>>> >>>>>>>> level, > > >>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each > > LocalBufferPool > > >>>>> >>>> can > > >>>>> >>>>>>>>>> overdraw up > > >>>>> >>>>>>>>>>>>>>>> to 10 memory-segments. > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback! > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>>> Thanks, > > >>>>> >>>>>>>>>>>>>>>> fanrui > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >>>>>>>>>>>>>>>> [1] > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >> > > >>>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints > > >>>>> >>>>>>>>>>>>>>>> [2] > > https://issues.apache.org/jira/browse/FLINK-14396 > > >>>>> >>>>>>>>>>>>>>>> [3] > > https://issues.apache.org/jira/browse/FLINK-26762 > > >>>>> >>>>>>>>>>>>>>>> [4] > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >> > > >>>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > > >>>>> >>>>>>>>>>>>>>>> [5] > > >>>>> >>>>>>>>>>>>>>>> > > >>>>> >> > > >>>>> > > > https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe > > >>>>> >>>>>>>>>>>>>>>> [6] > > >>>>> https://github.com/apache/flink-benchmarks/pull/54 > > >>>>> >>>>>>>>>>>> -- > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> Best regards, > > >>>>> >>>>>>>>>>>> Anton Kalashnikov > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>>>> > > >>>>> >>>>>>>>>> -- > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> Best regards, > > >>>>> >>>>>>>>>> Anton Kalashnikov > > >>>>> >>>>>>>>>> > > >>>>> >>>>>>>>>> > > >>>>> >> -- > > >>>>> >> > > >>>>> >> Best regards, > > >>>>> >> Anton Kalashnikov > > >>>>> >> > > >>>>> >> > > >>>>> > > >>>> > > >