Hi, Thanks for the answers.
> we may still need to discuss whether the > overdraft/reserve/spare should use extra buffers or buffers > in (exclusive + floating buffers)? and > These things resolve the different problems (at least as I see that). > The current hardcoded "1" says that we switch "availability" to > "unavailability" when one more buffer is left(actually a little less > than one buffer since we write the last piece of data to this last > buffer). The overdraft feature doesn't change this logic we still want > to switch to "unavailability" in such a way but if we are already in > "unavailability" and we want more buffers then we can take "overdraft > number" more. So we can not avoid this hardcoded "1" since we need to > understand when we should switch to "unavailability" Ok, I see. So it seems to me that both of you have in mind to keep the buffer pools as they are right now, but if we are in the middle of processing a record, we can request extra overdraft buffers on top of those? This is another way to implement the overdraft to what I was thinking. I was thinking about something like keeping the "overdraft" or more precisely buffer "reserve" in the buffer pool. I think my version would be easier to implement, because it is just fiddling with min/max buffers calculation and slightly modified `checkAvailability()` logic. On the other hand what you have in mind would better utilise the available memory, right? It would require more code changes (how would we know when we are allowed to request the overdraft?). However, in this case, I would be tempted to set the number of overdraft buffers by default to `Integer.MAX_VALUE`, and let the system request as many buffers as necessary. The only downside that I can think of (apart of higher complexity) would be higher chance of hitting a known/unsolved deadlock [1] in a scenario: - downstream task hasn't yet started - upstream task requests overdraft and uses all available memory segments from the global pool - upstream task is blocked, because downstream task hasn't started yet and can not consume any data - downstream task tries to start, but can not, as there are no available buffers > BTW, for watermark, the number of buffers it needs is > numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, > the watermark won't block in requestMemory. and > the best overdraft size will be equal to parallelism. That's a lot of buffers. I don't think we need that many for broadcasting watermarks. Watermarks are small, and remember that every subpartition has some partially filled/empty WIP buffer, so the vast majority of subpartitions will not need to request a new buffer. Best, Piotrek [1] https://issues.apache.org/jira/browse/FLINK-13203 wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> napisał(a): > Hi, > > > >> Do you mean to ignore it while processing records, but keep using > `maxBuffersPerChannel` when calculating the availability of the output? > > > Yes, it is correct. > > > >> Would it be a big issue if we changed it to check if at least > "overdraft number of buffers are available", where "overdraft number" is > configurable, instead of the currently hardcoded value of "1"? > > > These things resolve the different problems (at least as I see that). > The current hardcoded "1" says that we switch "availability" to > "unavailability" when one more buffer is left(actually a little less > than one buffer since we write the last piece of data to this last > buffer). The overdraft feature doesn't change this logic we still want > to switch to "unavailability" in such a way but if we are already in > "unavailability" and we want more buffers then we can take "overdraft > number" more. So we can not avoid this hardcoded "1" since we need to > understand when we should switch to "unavailability" > > > -- About "reserve" vs "overdraft" > > As Fanrui mentioned above, perhaps, the best overdraft size will be > equal to parallelism. Also, the user can set any value he wants. So even > if parallelism is small(~5) but the user's flatmap produces a lot of > data, the user can set 10 or even more. Which almost double the max > buffers and it will be impossible to reserve. At least we need to figure > out how to protect from such cases (the limit for an overdraft?). So > actually it looks even more difficult than increasing the maximum buffers. > > I want to emphasize that overdraft buffers are soft configuration which > means it takes as many buffers as the global buffers pool has > available(maybe zero) but less than this configured value. It is also > important to notice that perhaps, not many subtasks in TaskManager will > be using this feature so we don't actually need a lot of available > buffers for every subtask(Here, I mean that if we have only one > window/flatmap operator and many other operators, then one TaskManager > will have many ordinary subtasks which don't actually need overdraft and > several subtasks that needs this feature). But in case of reservation, > we will reserve some buffers for all operators even if they don't really > need it. > > > -- Legacy source problem > > If we still want to change max buffers then it is problem for > LegacySources(since every subtask of source will always use these > overdraft). But right now, I think that we can force to set 0 overdraft > buffers for legacy subtasks in configuration during execution(if it is > not too late for changing configuration in this place). > > > 03.05.2022 14:11, rui fan пишет: > > Hi > > > > Thanks for Martijn Visser and Piotrek's feedback. I agree with > > ignoring the legacy source, it will affect our design. User should > > use the new Source Api as much as possible. > > > > Hi Piotrek, we may still need to discuss whether the > > overdraft/reserve/spare should use extra buffers or buffers > > in (exclusive + floating buffers)? They have some differences. > > > > If it uses extra buffers: > > 1.The LocalBufferPool will be available when (usedBuffers + 1 > > <= currentPoolSize) and all subpartitions don't reach the > > maxBuffersPerChannel. > > > > If it uses the buffers in (exclusive + floating buffers): > > 1. The LocalBufferPool will be available when (usedBuffers + > > overdraftBuffers <= currentPoolSize) and all subpartitions > > don't reach the maxBuffersPerChannel. > > 2. For low parallelism jobs, if overdraftBuffers is large(>8), the > > usedBuffers will be small. That is the LocalBufferPool will be > > easily unavailable. For throughput, if users turn up the > > overdraft buffers, they need to turn up exclusive or floating > > buffers. It also affects the InputChannel, and it's is unfriendly > > to users. > > > > So I prefer the overdraft to use extra buffers. > > > > > > BTW, for watermark, the number of buffers it needs is > > numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, > > the watermark won't block in requestMemory. But it has > > 2 problems: > > 1. It needs more overdraft buffers. If the overdraft uses > > (exclusive + floating buffers), there will be fewer buffers > > available. Throughput may be affected. > > 2. The numberOfSubpartitions is different for each Task. > > So if users want to cover watermark using this feature, > > they don't know how to set the overdraftBuffers more r > > easonably. And if the parallelism is changed, users still > > need to change overdraftBuffers. It is unfriendly to users. > > > > So I propose we support overdraftBuffers=-1, It means > > we will automatically set overdraftBuffers=numberOfSubpartitions > > in the Constructor of LocalBufferPool. > > > > Please correct me if I'm wrong. > > > > Thanks > > fanrui > > > > On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > >> Hi fanrui, > >> > >>> Do you mean don't add the extra buffers? We just use (exclusive > buffers * > >>> parallelism + floating buffers)? The LocalBufferPool will be available > >> when > >>> (usedBuffers+overdraftBuffers <= > >> exclusiveBuffers*parallelism+floatingBuffers) > >>> and all subpartitions don't reach the maxBuffersPerChannel, right? > >> I'm not sure. Definitely we would need to adjust the minimum number of > the > >> required buffers, just as we did when we were implementing the non > blocking > >> outputs and adding availability logic to LocalBufferPool. Back then we > >> added "+ 1" to the minimum number of buffers. Currently this logic is > >> located NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition: > >> > >>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions + 1; > >> For performance reasons, we always require at least one buffer per > >> sub-partition. Otherwise performance falls drastically. Now if we > require 5 > >> overdraft buffers for output to be available, we need to have them on > top > >> of those "one buffer per sub-partition". So the logic should be changed > to: > >> > >>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions + > >> numOverdraftBuffers; > >> > >> Regarding increasing the number of max buffers I'm not sure. As long as > >> "overdraft << max number of buffers", because all buffers on the outputs > >> are shared across all sub-partitions. If we have 5 overdraft buffers, > and > >> parallelism of 100, it doesn't matter in the grand scheme of things if > we > >> make the output available if at least one single buffer is available or > at > >> least 5 buffers are available out of ~200 (100 * 2 + 8). So effects of > >> increasing the overdraft from 1 to for example 5 should be negligible. > For > >> small parallelism, like 5, increasing overdraft from 1 to 5 still > increases > >> the overdraft by only about 25%. So maybe we can keep the max as it is? > >> > >> If so, maybe we should change the name from "overdraft" to "buffer > reserve" > >> or "spare buffers"? And document it as "number of buffers kept in > reserve > >> in case of flatMap/firing timers/huge records"? > >> > >> What do you think Fenrui, Anton? > >> > >> Re LegacySources. I agree we can kind of ignore them in the new > features, > >> as long as we don't brake the existing deployments too much. > >> > >> Best, > >> Piotrek > >> > >> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com> > napisał(a): > >> > >>> Hi everyone, > >>> > >>> Just wanted to chip in on the discussion of legacy sources: IMHO, we > >> should > >>> not focus too much on improving/adding capabilities for legacy sources. > >> We > >>> want to persuade and push users to use the new Source API. Yes, this > >> means > >>> that there's work required by the end users to port any custom source > to > >>> the new interface. The benefits of the new Source API should outweigh > >> this. > >>> Anything that we build to support multiple interfaces means adding more > >>> complexity and more possibilities for bugs. Let's try to make our > lives a > >>> little bit easier. > >>> > >>> Best regards, > >>> > >>> Martijn Visser > >>> https://twitter.com/MartijnVisser82 > >>> https://github.com/MartijnVisser > >>> > >>> > >>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> wrote: > >>> > >>>> Hi Piotrek > >>>> > >>>>> Do you mean to ignore it while processing records, but keep using > >>>>> `maxBuffersPerChannel` when calculating the availability of the > >> output? > >>>> I think yes, and please Anton Kalashnikov to help double check. > >>>> > >>>>> +1 for just having this as a separate configuration. Is it a big > >>> problem > >>>>> that legacy sources would be ignoring it? Note that we already have > >>>>> effectively hardcoded a single overdraft buffer. > >>>>> `LocalBufferPool#checkAvailability` checks if there is a single > >> buffer > >>>>> available and this works the same for all tasks (including legacy > >>> source > >>>>> tasks). Would it be a big issue if we changed it to check if at least > >>>>> "overdraft number of buffers are available", where "overdraft number" > >>> is > >>>>> configurable, instead of the currently hardcoded value of "1"? > >>>> Do you mean don't add the extra buffers? We just use (exclusive > >> buffers * > >>>> parallelism + floating buffers)? The LocalBufferPool will be available > >>> when > >>>> (usedBuffers+overdraftBuffers <= > >>>> exclusiveBuffers*parallelism+floatingBuffers) > >>>> and all subpartitions don't reach the maxBuffersPerChannel, right? > >>>> > >>>> If yes, I think it can solve the problem of legacy source. There may > be > >>>> some impact. If overdraftBuffers is large and only one buffer is used > >> to > >>>> process a single record, exclusive buffers*parallelism + floating > >> buffers > >>>> cannot be used. It may only be possible to use (exclusive buffers * > >>>> parallelism > >>>> + floating buffers - overdraft buffers + 1). For throughput, if turn > up > >>> the > >>>> overdraft buffers, the flink user needs to turn up exclusive or > >> floating > >>>> buffers. And it also affects the InputChannel. > >>>> > >>>> If not, I don't think it can solve the problem of legacy source. The > >>> legacy > >>>> source don't check isAvailable, If there are the extra buffers, legacy > >>>> source > >>>> will use them up until block in requestMemory. > >>>> > >>>> > >>>> Thanks > >>>> fanrui > >>>> > >>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <pnowoj...@apache.org> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> > >>>>> +1 for the general proposal from my side. It would be a nice > >> workaround > >>>>> flatMaps, WindowOperators and large records issues with unaligned > >>>>> checkpoints. > >>>>> > >>>>>> The first task is about ignoring max buffers per channel. This > >> means > >>> if > >>>>>> we request a memory segment from LocalBufferPool and the > >>>>>> maxBuffersPerChannel is reached for this channel, we just ignore > >> that > >>>>>> and continue to allocate buffer while LocalBufferPool has it(it is > >>>>>> actually not a overdraft). > >>>>> Do you mean to ignore it while processing records, but keep using > >>>>> `maxBuffersPerChannel` when calculating the availability of the > >> output? > >>>>>> The second task is about the real overdraft. I am pretty convinced > >>> now > >>>>>> that we, unfortunately, need configuration for limitation of > >>> overdraft > >>>>>> number(because it is not ok if one subtask allocates all buffers of > >>> one > >>>>>> TaskManager considering that several different jobs can be > >> submitted > >>> on > >>>>>> this TaskManager). So idea is to have > >>>>>> maxOverdraftBuffersPerPartition(technically to say per > >>>> LocalBufferPool). > >>>>>> In this case, when a limit of buffers in LocalBufferPool is > >> reached, > >>>>>> LocalBufferPool can request additionally from NetworkBufferPool up > >> to > >>>>>> maxOverdraftBuffersPerPartition buffers. > >>>>> +1 for just having this as a separate configuration. Is it a big > >>> problem > >>>>> that legacy sources would be ignoring it? Note that we already have > >>>>> effectively hardcoded a single overdraft buffer. > >>>>> `LocalBufferPool#checkAvailability` checks if there is a single > >> buffer > >>>>> available and this works the same for all tasks (including legacy > >>> source > >>>>> tasks). Would it be a big issue if we changed it to check if at least > >>>>> "overdraft number of buffers are available", where "overdraft number" > >>> is > >>>>> configurable, instead of the currently hardcoded value of "1"? > >>>>> > >>>>> Best, > >>>>> Piotrek > >>>>> > >>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> napisał(a): > >>>>> > >>>>>> Let me add some information about the LegacySource. > >>>>>> > >>>>>> If we want to disable the overdraft buffer for LegacySource. > >>>>>> Could we add the enableOverdraft in LocalBufferPool? > >>>>>> The default value is false. If the getAvailableFuture is called, > >>>>>> change enableOverdraft=true. It indicates whether there are > >>>>>> checks isAvailable elsewhere. > >>>>>> > >>>>>> I don't think it is elegant, but it's safe. Please correct me if > >> I'm > >>>>> wrong. > >>>>>> Thanks > >>>>>> fanrui > >>>>>> > >>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> > >>> wrote: > >>>>>>> Hi, > >>>>>>> > >>>>>>> Thanks for your quick response. > >>>>>>> > >>>>>>> For question 1/2/3, we think they are clear. We just need to > >>> discuss > >>>>> the > >>>>>>> default value in PR. > >>>>>>> > >>>>>>> For the legacy source, you are right. It's difficult for general > >>>>>>> implementation. > >>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in > >>>>>>> SourceFunction.SourceContext. And call it in our common > >>> LegacySource, > >>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume > >> kafka, > >>> so > >>>>>>> fixing FlinkKafkaConsumer solved most of our problems. > >>>>>>> > >>>>>>> Core code: > >>>>>>> ``` > >>>>>>> public void ensureRecordWriterIsAvailable() { > >>>>>>> if (recordWriter == null > >>>>>>> || > >>>>>>> > >>> > !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, > >>>>>>> false) > >>>>>>> || recordWriter.isAvailable()) { > >>>>>>> return; > >>>>>>> } > >>>>>>> > >>>>>>> CompletableFuture<?> resumeFuture = > >>>>>> recordWriter.getAvailableFuture(); > >>>>>>> try { > >>>>>>> resumeFuture.get(); > >>>>>>> } catch (Throwable ignored) { > >>>>>>> } > >>>>>>> } > >>>>>>> ``` > >>>>>>> > >>>>>>> LegacySource calls sourceContext.ensureRecordWriterIsAvailable() > >>>>>>> before synchronized (checkpointLock) and collects records. > >>>>>>> Please let me know if there is a better solution. > >>>>>>> > >>>>>>> Thanks > >>>>>>> fanrui > >>>>>>> > >>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov < > >>>> kaa....@yandex.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi. > >>>>>>>> > >>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or two > >>>> commits > >>>>>> in a > >>>>>>>> PR? > >>>>>>>> > >>>>>>>> Perhaps, the separated ticket will be better since this task has > >>>> fewer > >>>>>>>> questions but we should find a solution for LegacySource first. > >>>>>>>> > >>>>>>>> -- 2. For the first task, if the flink user disables the > >>> Unaligned > >>>>>>>> Checkpoint, do we ignore max buffers per channel? Because > >> the > >>>>>>>> overdraft > >>>>>>>> isn't useful for the Aligned Checkpoint, it still needs to > >>> wait > >>>>> for > >>>>>>>> downstream Task to consume. > >>>>>>>> > >>>>>>>> I think that the logic should be the same for AC and UC. As I > >>>>>> understand, > >>>>>>>> the overdraft maybe is not really helpful for AC but it doesn't > >>> make > >>>>> it > >>>>>>>> worse as well. > >>>>>>>> > >>>>>>>> 3. For the second task > >>>>>>>> -- - The default value of maxOverdraftBuffersPerPartition > >> may > >>>>> also > >>>>>>>> need > >>>>>>>> to be discussed. > >>>>>>>> > >>>>>>>> I think it should be a pretty small value or even 0 since it > >> kind > >>> of > >>>>>>>> optimization and user should understand what they do(especially > >> if > >>>> we > >>>>>>>> implement the first task). > >>>>>>>> > >>>>>>>> -- - If the user disables the Unaligned Checkpoint, can we > >>> set > >>>>> the > >>>>>>>> maxOverdraftBuffersPerPartition=0? Because the overdraft > >>>> isn't > >>>>>>>> useful for > >>>>>>>> the Aligned Checkpoint. > >>>>>>>> > >>>>>>>> The same answer that above, if the overdraft doesn't make > >>>> degradation > >>>>>> for > >>>>>>>> the Aligned Checkpoint I don't think that we should make > >>> difference > >>>>>> between > >>>>>>>> AC and UC. > >>>>>>>> > >>>>>>>> 4. For the legacy source > >>>>>>>> -- - If enabling the Unaligned Checkpoint, it uses up to > >>>>>>>> maxOverdraftBuffersPerPartition buffers. > >>>>>>>> - If disabling the UC, it doesn't use the overdraft > >> buffer. > >>>>>>>> - Do you think it's ok? > >>>>>>>> > >>>>>>>> Ideally, I don't want to use overdraft for LegacySource at all > >>> since > >>>>> it > >>>>>>>> can lead to undesirable results especially if the limit is high. > >>> At > >>>>>> least, > >>>>>>>> as I understand, it will always work in overdraft mode and it > >> will > >>>>>> borrow > >>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool > >> which > >>>> can > >>>>>> lead > >>>>>>>> to degradation of other subtasks on the same TaskManager. > >>>>>>>> > >>>>>>>> -- - Actually, we added the checkAvailable logic for > >>>> LegacySource > >>>>>> in > >>>>>>>> our > >>>>>>>> internal version. It works well. > >>>>>>>> > >>>>>>>> I don't really understand how it is possible for general case > >>>>>> considering > >>>>>>>> that each user has their own implementation of > >>> LegacySourceOperator > >>>>>>>> -- 5. For the benchmark, do you have any suggestions? I > >>> submitted > >>>>> the > >>>>>> PR > >>>>>>>> [1]. > >>>>>>>> > >>>>>>>> I haven't looked at it yet, but I'll try to do it soon. > >>>>>>>> > >>>>>>>> > >>>>>>>> 29.04.2022 14:14, rui fan пишет: > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> Thanks for your feedback. I have a servel of questions. > >>>>>>>>> > >>>>>>>>> 1. Do you mean split this into two JIRAs or two PRs or two > >>>>> commits > >>>>>>>> in a > >>>>>>>>> PR? > >>>>>>>>> 2. For the first task, if the flink user disables the > >>>> Unaligned > >>>>>>>>> Checkpoint, do we ignore max buffers per channel? Because > >>> the > >>>>>>>> overdraft > >>>>>>>>> isn't useful for the Aligned Checkpoint, it still needs to > >>>> wait > >>>>>> for > >>>>>>>>> downstream Task to consume. > >>>>>>>>> 3. For the second task > >>>>>>>>> - The default value of maxOverdraftBuffersPerPartition > >>> may > >>>>> also > >>>>>>>> need > >>>>>>>>> to be discussed. > >>>>>>>>> - If the user disables the Unaligned Checkpoint, can we > >>> set > >>>>> the > >>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the > >> overdraft > >>>>> isn't > >>>>>>>> useful for > >>>>>>>>> the Aligned Checkpoint. > >>>>>>>>> 4. For the legacy source > >>>>>>>>> - If enabling the Unaligned Checkpoint, it uses up to > >>>>>>>>> maxOverdraftBuffersPerPartition buffers. > >>>>>>>>> - If disabling the UC, it doesn't use the overdraft > >>> buffer. > >>>>>>>>> - Do you think it's ok? > >>>>>>>>> - Actually, we added the checkAvailable logic for > >>>>> LegacySource > >>>>>>>> in our > >>>>>>>>> internal version. It works well. > >>>>>>>>> 5. For the benchmark, do you have any suggestions? I > >>> submitted > >>>>> the > >>>>>>>> PR > >>>>>>>>> [1]. > >>>>>>>>> > >>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54 > >>>>>>>>> > >>>>>>>>> Thanks > >>>>>>>>> fanrui > >>>>>>>>> > >>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov < > >>>>> kaa....@yandex.com > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here is > >>> some > >>>>>>>>>> conclusion: > >>>>>>>>>> > >>>>>>>>>> First of all, let's split this into two tasks. > >>>>>>>>>> > >>>>>>>>>> The first task is about ignoring max buffers per channel. > >> This > >>>>> means > >>>>>> if > >>>>>>>>>> we request a memory segment from LocalBufferPool and the > >>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just > >>> ignore > >>>>> that > >>>>>>>>>> and continue to allocate buffer while LocalBufferPool has > >> it(it > >>>> is > >>>>>>>>>> actually not a overdraft). > >>>>>>>>>> > >>>>>>>>>> The second task is about the real overdraft. I am pretty > >>>> convinced > >>>>>> now > >>>>>>>>>> that we, unfortunately, need configuration for limitation of > >>>>>> overdraft > >>>>>>>>>> number(because it is not ok if one subtask allocates all > >>> buffers > >>>> of > >>>>>> one > >>>>>>>>>> TaskManager considering that several different jobs can be > >>>>> submitted > >>>>>> on > >>>>>>>>>> this TaskManager). So idea is to have > >>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per > >>>>>>>> LocalBufferPool). > >>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool is > >>>>> reached, > >>>>>>>>>> LocalBufferPool can request additionally from > >> NetworkBufferPool > >>>> up > >>>>> to > >>>>>>>>>> maxOverdraftBuffersPerPartition buffers. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> But it is still not clear how to handle LegacySource since it > >>>>>> actually > >>>>>>>>>> works as unlimited flatmap and it will always work in > >> overdraft > >>>>> mode > >>>>>>>>>> which is not a target. So we still need to think about that. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 29.04.2022 11:11, rui fan пишет: > >>>>>>>>>>> Hi Anton Kalashnikov, > >>>>>>>>>>> > >>>>>>>>>>> I think you agree with we should limit the maximum number of > >>>>>> overdraft > >>>>>>>>>>> segments that each LocalBufferPool can apply for, right? > >>>>>>>>>>> > >>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to don't > >> add > >>>> the > >>>>>> new > >>>>>>>>>>> configuration. And I hope to hear more from the community. > >>>>>>>>>>> > >>>>>>>>>>> Best wishes > >>>>>>>>>>> fanrui > >>>>>>>>>>> > >>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan < > >>> 1996fan...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>>>>> Hi Anton Kalashnikov, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for your very clear reply, I think you are totally > >>>> right. > >>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used as > >>> the > >>>>>>>>>>>> overdraft buffer, it won't need the new buffer > >>>>> configuration.Flink > >>>>>>>> users > >>>>>>>>>>>> can turn up the maxBuffersNumber to control the overdraft > >>>> buffer > >>>>>>>> size. > >>>>>>>>>>>> Also, I‘d like to add some information. For safety, we > >> should > >>>>> limit > >>>>>>>> the > >>>>>>>>>>>> maximum number of overdraft segments that each > >>> LocalBufferPool > >>>>>>>>>>>> can apply for. > >>>>>>>>>>>> > >>>>>>>>>>>> Why do we limit it? > >>>>>>>>>>>> Some operators don't check the `recordWriter.isAvailable` > >>>> during > >>>>>>>>>>>> processing records, such as LegacySource. I have mentioned > >> it > >>>> in > >>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases. > >>>>>>>>>>>> > >>>>>>>>>>>> If don't add the limitation, the LegacySource will use up > >> all > >>>>>>>> remaining > >>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is > >>>> severe. > >>>>>>>>>>>> How to limit it? > >>>>>>>>>>>> I prefer to hard code the > >>>>>> `maxOverdraftBuffers=numberOfSubpartitions` > >>>>>>>>>>>> in the constructor of LocalBufferPool. The > >>> maxOverdraftBuffers > >>>> is > >>>>>>>> just > >>>>>>>>>>>> for safety, and it should be enough for most flink jobs. Or > >>> we > >>>>> can > >>>>>>>> set > >>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` > >> to > >>>>> handle > >>>>>>>>>>>> some jobs of low parallelism. > >>>>>>>>>>>> > >>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we can > >>> set > >>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of > >> LocalBufferPool. > >>>>>> Because > >>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint. > >>>>>>>>>>>> > >>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot. > >>>>>>>>>>>> > >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759 > >>>>>>>>>>>> > >>>>>>>>>>>> Best wishes > >>>>>>>>>>>> fanrui > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov < > >>>>>>>> kaa....@yandex.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi fanrui, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for creating the FLIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> In general, I think the overdraft is good idea and it > >> should > >>>>> help > >>>>>> in > >>>>>>>>>>>>> described above cases. Here are my thoughts about > >>>> configuration: > >>>>>>>>>>>>> Please, correct me if I am wrong but as I understand right > >>> now > >>>>> we > >>>>>>>> have > >>>>>>>>>>>>> following calculation. > >>>>>>>>>>>>> > >>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network > >>> memory(calculated > >>>>> via > >>>>>>>>>>>>> taskmanager.memory.network.fraction, > >>>>>> taskmanager.memory.network.min, > >>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) / > >>>>>>>>>>>>> taskmanager.memory.segment-size. > >>>>>>>>>>>>> > >>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive > >> buffers > >>> * > >>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in > >>>> TaskManager > >>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used at > >>>>> current > >>>>>>>>>>>>> moment(always <= requiredBuffersNumber) > >>>>>>>>>>>>> > >>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to > >>>>> maxBuffersNumber > >>>>>>>> which > >>>>>>>>>>>>> allows Flink work predictibly. But if > >> requiredBuffersNumber > >>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not good) > >>> since > >>>>> not > >>>>>>>> all > >>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if Flink > >> can > >>>> not > >>>>>>>>>>>>> allocate floating buffers) > >>>>>>>>>>>>> > >>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I > >>>> understand > >>>>>>>> Flink > >>>>>>>>>>>>> just never use these leftovers buffers(maxBuffersNumber - > >>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we can > >>>> actualy > >>>>>> use > >>>>>>>>>>>>> even difference 'requiredBuffersNumber - > >> buffersInUseNumber' > >>>>> since > >>>>>>>> if > >>>>>>>>>>>>> one TaskManager contains several operators including > >>> 'window' > >>>>>> which > >>>>>>>> can > >>>>>>>>>>>>> temporally borrow buffers from the global pool). > >>>>>>>>>>>>> > >>>>>>>>>>>>> My proposal, more specificaly(it relates only to > >> requesting > >>>>>> buffers > >>>>>>>>>>>>> during processing single record while switching to > >>>> unavalability > >>>>>>>>>> between > >>>>>>>>>>>>> records should be the same as we have it now): > >>>>>>>>>>>>> > >>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel > >>>> reached, > >>>>>>>> then > >>>>>>>>>>>>> just ignore this limitation and allocate this buffers from > >>> any > >>>>>>>>>>>>> place(from LocalBufferPool if it has something yet > >> otherwise > >>>>> from > >>>>>>>>>>>>> NetworkBufferPool) > >>>>>>>>>>>>> > >>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally > >> allocate > >>>> it > >>>>>> from > >>>>>>>>>>>>> NetworkBufferPool while it has something to allocate > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Maybe I missed something and this solution won't work, > >> but I > >>>>> like > >>>>>> it > >>>>>>>>>>>>> since on the one hand, it work from the scratch without > >> any > >>>>>>>>>>>>> configuration, on the other hand, it can be configuration > >> by > >>>>>>>> changing > >>>>>>>>>>>>> proportion of maxBuffersNumber and requiredBuffersNumber. > >>>>>>>>>>>>> > >>>>>>>>>>>>> The last thing that I want to say, I don't really want to > >>>>>> implement > >>>>>>>> new > >>>>>>>>>>>>> configuration since even now it is not clear how to > >>> correctly > >>>>>>>> configure > >>>>>>>>>>>>> network buffers with existing configuration and I don't > >> want > >>>> to > >>>>>>>>>>>>> complicate it, especially if it will be possible to > >> resolve > >>>> the > >>>>>>>> problem > >>>>>>>>>>>>> automatically(as described above). > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> So is my understanding about network memory/buffers > >> correct? > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>> Anton Kalashnikov > >>>>>>>>>>>>> > >>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет: > >>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature of > >>>> Flink. > >>>>>> It > >>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout or > >>> slow > >>>>>>>>>>>>>> checkpoint when backpressure is severe. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work well > >>>> when > >>>>>> the > >>>>>>>>>>>>>> back pressure is severe and multiple output buffers are > >>>>> required > >>>>>> to > >>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also mentioned > >>> this > >>>>>> issue > >>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve it. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the > >>>>> overdraft > >>>>>>>>>>>>>> buffer mechanism. After discussing with Anton > >> Kalashnikov, > >>>>> there > >>>>>>>> are > >>>>>>>>>>>>>> still some points to discuss: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> * There are already a lot of buffer-related > >>>> configurations. > >>>>>> Do > >>>>>>>> we > >>>>>>>>>>>>>> need to add a new configuration for the overdraft > >>>> buffer? > >>>>>>>>>>>>>> * Where should the overdraft buffer use memory? > >>>>>>>>>>>>>> * If the overdraft-buffer uses the memory remaining > >> in > >>>> the > >>>>>>>>>>>>>> NetworkBufferPool, no new configuration needs to be > >>>>> added. > >>>>>>>>>>>>>> * If adding a new configuration: > >>>>>>>>>>>>>> o Should we set the overdraft-memory-size at the > >> TM > >>>>> level > >>>>>>>> or > >>>>>>>>>> the > >>>>>>>>>>>>>> Task level? > >>>>>>>>>>>>>> o Or set overdraft-buffers to indicate the number > >>> of > >>>>>>>>>>>>>> memory-segments that can be overdrawn. > >>>>>>>>>>>>>> o What is the default value? How to set sensible > >>>>>> defaults? > >>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it using > >>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers at > >>> Task > >>>>>> level, > >>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool > >> can > >>>>>>>> overdraw up > >>>>>>>>>>>>>> to 10 memory-segments. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Looking forward to your feedback! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> fanrui > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints > >>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396 > >>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762 > >>>>>>>>>>>>>> [4] > >>>>>>>>>>>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer > >>>>>>>>>>>>>> [5] > >>>>>>>>>>>>>> > >> > https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe > >>>>>>>>>>>>>> [6] https://github.com/apache/flink-benchmarks/pull/54 > >>>>>>>>>> -- > >>>>>>>>>> > >>>>>>>>>> Best regards, > >>>>>>>>>> Anton Kalashnikov > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> -- > >>>>>>>> > >>>>>>>> Best regards, > >>>>>>>> Anton Kalashnikov > >>>>>>>> > >>>>>>>> > -- > > Best regards, > Anton Kalashnikov > >