Hi everyone, Offline Confirmed with Anton. He has replied in an earlier email: "I vote for 5 as 'max-overdraft-buffers-per-gate'." So as we understand everybody agrees this.
This FLIP-227 discussion is over, I've updated FLIP-227. It will be split into 3 tickets to complete. Thanks for all the discussion about this FLIP again, I will open a vote today. Best wishes fanrui On Fri, May 6, 2022 at 4:57 PM rui fan <1996fan...@gmail.com> wrote: > Hi > > I created the FLINK-27530[1] as the parent ticket. And I > updated it to FLIP. > > [1] https://issues.apache.org/jira/browse/FLINK-27530 > > Thanks > fanrui > > On Fri, May 6, 2022 at 4:27 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi, >> >> I'm not sure. Maybe 5 will be fine? Anton, Dawid, what do you think? >> >> Can you create a parent ticket for the whole FLIP to group all of the >> issues together? >> >> Also FLIP should be officially voted first. >> >> Best, >> Piotrek >> >> pt., 6 maj 2022 o 09:08 rui fan <1996fan...@gmail.com> napisał(a): >> >> > Hi Anton, Piotrek and Dawid, >> > >> > Thanks for your help. >> > >> > I created FLINK-27522[1] as the first task. And I will finish it asap. >> > >> > @Piotrek, for the default value, do you think it should be less >> > than 5? What do you think about 3? Actually, I think 5 isn't big. >> > It's 1 or 3 or 5 that doesn't matter much, the focus is on >> > reasonably resolving deadlock problems. Or I push the second >> > task to move forward first and we discuss the default value in PR. >> > >> > For the legacySource, I got your idea. And I propose we create >> > the third task to handle it. Because it is independent and for >> > compatibility with the old API. What do you think? I updated >> > the third task on FLIP-227[2]. >> > >> > If all is ok, I will create a JIRA for the third Task and add it to >> > FLIP-227. And I will develop them from the first task to the >> > third task. >> > >> > Thanks again for your help. >> > >> > [1] https://issues.apache.org/jira/browse/FLINK-27522 >> > [2] >> > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >> > >> > Thanks >> > fanrui >> > >> > On Fri, May 6, 2022 at 3:50 AM Piotr Nowojski <pnowoj...@apache.org> >> > wrote: >> > >> > > Hi fanrui, >> > > >> > > > How to identify legacySource? >> > > >> > > legacy sources are always using the SourceStreamTask class and >> > > SourceStreamTask is used only for legacy sources. But I'm not sure >> how to >> > > enable/disable that. Adding `disableOverdraft()` call in >> SourceStreamTask >> > > would be better compared to relying on the `getAvailableFuture()` call >> > > (isn't it used for back pressure metric anyway?). Ideally we should >> > > enable/disable it in the constructors, but that might be tricky. >> > > >> > > > I prefer it to be between 5 and 10 >> > > >> > > I would vote for a smaller value because of FLINK-13203 >> > > >> > > Piotrek >> > > >> > > >> > > >> > > czw., 5 maj 2022 o 11:49 rui fan <1996fan...@gmail.com> napisał(a): >> > > >> > >> Hi, >> > >> >> > >> Thanks a lot for your discussion. >> > >> >> > >> After several discussions, I think it's clear now. I updated the >> > >> "Proposed Changes" of FLIP-227[1]. If I have something >> > >> missing, please help to add it to FLIP, or add it in the mail >> > >> and I can add it to FLIP. If everything is OK, I will create a >> > >> new JIRA for the first task, and use FLINK-26762[2] as the >> > >> second task. >> > >> >> > >> About the legacy source, do we set maxOverdraftBuffersPerGate=0 >> > >> directly? How to identify legacySource? Or could we add >> > >> the overdraftEnabled in LocalBufferPool? The default value >> > >> is false. If the getAvailableFuture is called, change >> > >> overdraftEnabled=true. >> > >> It indicates whether there are checks isAvailable elsewhere. >> > >> It might be more general, it can cover more cases. >> > >> >> > >> Also, I think the default value of 'max-overdraft-buffers-per-gate' >> > >> needs to be confirmed. I prefer it to be between 5 and 10. How >> > >> do you think? >> > >> >> > >> [1] >> > >> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >> > >> [2] https://issues.apache.org/jira/browse/FLINK-26762 >> > >> >> > >> Thanks >> > >> fanrui >> > >> >> > >> On Thu, May 5, 2022 at 4:41 PM Piotr Nowojski <pnowoj...@apache.org> >> > >> wrote: >> > >> >> > >>> Hi again, >> > >>> >> > >>> After sleeping over this, if both versions (reserve and overdraft) >> have >> > >>> the same complexity, I would also prefer the overdraft. >> > >>> >> > >>> > `Integer.MAX_VALUE` as default value was my idea as well but now, >> as >> > >>> > Dawid mentioned, I think it is dangerous since it is too implicit >> for >> > >>> > the user and if the user submits one more job for the same >> TaskManger >> > >>> >> > >>> As I mentioned, it's not only an issue with multiple jobs. The same >> > >>> problem can happen with different subtasks from the same job, >> > potentially >> > >>> leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I >> > would be >> > >>> in favour of Integer.MAX_VALUE to be the default value, but as it >> is, I >> > >>> think we should indeed play on the safe side and limit it. >> > >>> >> > >>> > I still don't understand how should be limited "reserve" >> > >>> implementation. >> > >>> > I mean if we have X buffers in total and the user sets overdraft >> > equal >> > >>> > to X we obviously can not reserve all buffers, but how many we are >> > >>> > allowed to reserve? Should it be a different configuration like >> > >>> > percentegeForReservedBuffers? >> > >>> >> > >>> The reserve could be defined as percentage, or as a fixed number of >> > >>> buffers. But yes. In normal operation subtask would not use the >> > reserve, as >> > >>> if numberOfAvailableBuffers < reserve, the output would be not >> > available. >> > >>> Only in the flatMap/timers/huge records case the reserve could be >> used. >> > >>> >> > >>> > 1. If the total buffers of LocalBufferPool <= the reserve buffers, >> > >>> will LocalBufferPool never be available? Can't process data? >> > >>> >> > >>> Of course we would need to make sure that never happens. So the >> reserve >> > >>> should be < total buffer size. >> > >>> >> > >>> > 2. If the overdraft buffer use the extra buffers, when the >> downstream >> > >>> > task inputBuffer is insufficient, it should fail to start the job, >> > and >> > >>> then >> > >>> > restart? When the InputBuffer is initialized, it will apply for >> > enough >> > >>> > buffers, right? >> > >>> >> > >>> The failover if downstream can not allocate buffers is already >> > >>> implemented FLINK-14872 [2]. There is a timeout for how long the >> task >> > is >> > >>> waiting for buffer allocation. However this doesn't prevent many >> > >>> (potentially infinitely many) deadlock/restarts cycles. IMO the >> propper >> > >>> solution for [1] would be 2b described in the ticket: >> > >>> >> > >>> > 2b. Assign extra buffers only once all of the tasks are RUNNING. >> This >> > >>> is a simplified version of 2a, without tracking the tasks >> > sink-to-source. >> > >>> >> > >>> But that's a pre-existing problem and I don't think we have to >> solve it >> > >>> before implementing overdraft. I think we would need to solve it >> only >> > >>> before setting Integer.MAX_VALUE as the default for the overdraft. >> > Maybe I >> > >>> would hesitate setting the overdraft to anything more then a couple >> of >> > >>> buffers by default for the same reason. >> > >>> >> > >>> > Actually, I totally agree that we don't need a lot of buffers for >> > >>> overdraft >> > >>> >> > >>> and >> > >>> >> > >>> > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. >> > >>> > When we finish this feature and after users use it, if users >> feedback >> > >>> > this issue we can discuss again. >> > >>> >> > >>> +1 >> > >>> >> > >>> Piotrek >> > >>> >> > >>> [1] https://issues.apache.org/jira/browse/FLINK-13203 >> > >>> [2] https://issues.apache.org/jira/browse/FLINK-14872 >> > >>> >> > >>> czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a): >> > >>> >> > >>>> Hi everyone, >> > >>>> >> > >>>> I still have some questions. >> > >>>> >> > >>>> 1. If the total buffers of LocalBufferPool <= the reserve buffers, >> > will >> > >>>> LocalBufferPool never be available? Can't process data? >> > >>>> 2. If the overdraft buffer use the extra buffers, when the >> downstream >> > >>>> task inputBuffer is insufficient, it should fail to start the job, >> and >> > >>>> then >> > >>>> restart? When the InputBuffer is initialized, it will apply for >> enough >> > >>>> buffers, right? >> > >>>> >> > >>>> Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. >> > >>>> When we finish this feature and after users use it, if users >> feedback >> > >>>> this issue we can discuss again. >> > >>>> >> > >>>> Thanks >> > >>>> fanrui >> > >>>> >> > >>>> On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz < >> > dwysakow...@apache.org> >> > >>>> wrote: >> > >>>> >> > >>>>> Hey all, >> > >>>>> >> > >>>>> I have not replied in the thread yet, but I was following the >> > >>>>> discussion. >> > >>>>> >> > >>>>> Personally, I like Fanrui's and Anton's idea. As far as I >> understand >> > >>>>> it >> > >>>>> the idea to distinguish between inside flatMap & outside would be >> > >>>>> fairly >> > >>>>> simple, but maybe slightly indirect. The checkAvailability would >> > >>>>> remain >> > >>>>> unchanged and it is checked always between separate invocations of >> > the >> > >>>>> UDF. Therefore the overdraft buffers would not apply there. >> However >> > >>>>> once >> > >>>>> the pool says it is available, it means it has at least an initial >> > >>>>> buffer. So any additional request without checking for >> availability >> > >>>>> can >> > >>>>> be considered to be inside of processing a single record. This >> does >> > >>>>> not >> > >>>>> hold just for the LegacySource as I don't think it actually checks >> > for >> > >>>>> the availability of buffers in the LocalBufferPool. >> > >>>>> >> > >>>>> In the offline chat with Anton, we also discussed if we need a >> limit >> > >>>>> of >> > >>>>> the number of buffers we could overdraft (or in other words if the >> > >>>>> limit >> > >>>>> should be equal to Integer.MAX_VALUE), but personally I'd prefer >> to >> > >>>>> stay >> > >>>>> on the safe side and have it limited. The pool of network buffers >> is >> > >>>>> shared for the entire TaskManager, so it means it can be shared >> even >> > >>>>> across tasks of separate jobs. However, I might be just >> unnecessarily >> > >>>>> cautious here. >> > >>>>> >> > >>>>> Best, >> > >>>>> >> > >>>>> Dawid >> > >>>>> >> > >>>>> On 04/05/2022 10:54, Piotr Nowojski wrote: >> > >>>>> > Hi, >> > >>>>> > >> > >>>>> > Thanks for the answers. >> > >>>>> > >> > >>>>> >> we may still need to discuss whether the >> > >>>>> >> overdraft/reserve/spare should use extra buffers or buffers >> > >>>>> >> in (exclusive + floating buffers)? >> > >>>>> > and >> > >>>>> > >> > >>>>> >> These things resolve the different problems (at least as I see >> > >>>>> that). >> > >>>>> >> The current hardcoded "1" says that we switch "availability" >> to >> > >>>>> >> "unavailability" when one more buffer is left(actually a little >> > less >> > >>>>> >> than one buffer since we write the last piece of data to this >> last >> > >>>>> >> buffer). The overdraft feature doesn't change this logic we >> still >> > >>>>> want >> > >>>>> >> to switch to "unavailability" in such a way but if we are >> already >> > in >> > >>>>> >> "unavailability" and we want more buffers then we can take >> > >>>>> "overdraft >> > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we >> need >> > >>>>> to >> > >>>>> >> understand when we should switch to "unavailability" >> > >>>>> > Ok, I see. So it seems to me that both of you have in mind to >> keep >> > >>>>> the >> > >>>>> > buffer pools as they are right now, but if we are in the middle >> of >> > >>>>> > processing a record, we can request extra overdraft buffers on >> top >> > of >> > >>>>> > those? This is another way to implement the overdraft to what I >> was >> > >>>>> > thinking. I was thinking about something like keeping the >> > >>>>> "overdraft" or >> > >>>>> > more precisely buffer "reserve" in the buffer pool. I think my >> > >>>>> version >> > >>>>> > would be easier to implement, because it is just fiddling with >> > >>>>> min/max >> > >>>>> > buffers calculation and slightly modified `checkAvailability()` >> > >>>>> logic. >> > >>>>> > >> > >>>>> > On the other hand what you have in mind would better utilise >> the >> > >>>>> available >> > >>>>> > memory, right? It would require more code changes (how would we >> > know >> > >>>>> when >> > >>>>> > we are allowed to request the overdraft?). However, in this >> case, I >> > >>>>> would >> > >>>>> > be tempted to set the number of overdraft buffers by default to >> > >>>>> > `Integer.MAX_VALUE`, and let the system request as many buffers >> as >> > >>>>> > necessary. The only downside that I can think of (apart of >> higher >> > >>>>> > complexity) would be higher chance of hitting a known/unsolved >> > >>>>> deadlock [1] >> > >>>>> > in a scenario: >> > >>>>> > - downstream task hasn't yet started >> > >>>>> > - upstream task requests overdraft and uses all available memory >> > >>>>> segments >> > >>>>> > from the global pool >> > >>>>> > - upstream task is blocked, because downstream task hasn't >> started >> > >>>>> yet and >> > >>>>> > can not consume any data >> > >>>>> > - downstream task tries to start, but can not, as there are no >> > >>>>> available >> > >>>>> > buffers >> > >>>>> > >> > >>>>> >> BTW, for watermark, the number of buffers it needs is >> > >>>>> >> numberOfSubpartitions. So if >> > overdraftBuffers=numberOfSubpartitions, >> > >>>>> >> the watermark won't block in requestMemory. >> > >>>>> > and >> > >>>>> > >> > >>>>> >> the best overdraft size will be equal to parallelism. >> > >>>>> > That's a lot of buffers. I don't think we need that many for >> > >>>>> broadcasting >> > >>>>> > watermarks. Watermarks are small, and remember that every >> > >>>>> subpartition has >> > >>>>> > some partially filled/empty WIP buffer, so the vast majority of >> > >>>>> > subpartitions will not need to request a new buffer. >> > >>>>> > >> > >>>>> > Best, >> > >>>>> > Piotrek >> > >>>>> > >> > >>>>> > [1] https://issues.apache.org/jira/browse/FLINK-13203 >> > >>>>> > >> > >>>>> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> >> > >>>>> napisał(a): >> > >>>>> > >> > >>>>> >> Hi, >> > >>>>> >> >> > >>>>> >> >> > >>>>> >> >> Do you mean to ignore it while processing records, but >> keep >> > >>>>> using >> > >>>>> >> `maxBuffersPerChannel` when calculating the availability of the >> > >>>>> output? >> > >>>>> >> >> > >>>>> >> >> > >>>>> >> Yes, it is correct. >> > >>>>> >> >> > >>>>> >> >> > >>>>> >> >> Would it be a big issue if we changed it to check if at >> least >> > >>>>> >> "overdraft number of buffers are available", where "overdraft >> > >>>>> number" is >> > >>>>> >> configurable, instead of the currently hardcoded value of "1"? >> > >>>>> >> >> > >>>>> >> >> > >>>>> >> These things resolve the different problems (at least as I see >> > >>>>> that). >> > >>>>> >> The current hardcoded "1" says that we switch "availability" >> to >> > >>>>> >> "unavailability" when one more buffer is left(actually a little >> > less >> > >>>>> >> than one buffer since we write the last piece of data to this >> last >> > >>>>> >> buffer). The overdraft feature doesn't change this logic we >> still >> > >>>>> want >> > >>>>> >> to switch to "unavailability" in such a way but if we are >> already >> > in >> > >>>>> >> "unavailability" and we want more buffers then we can take >> > >>>>> "overdraft >> > >>>>> >> number" more. So we can not avoid this hardcoded "1" since we >> need >> > >>>>> to >> > >>>>> >> understand when we should switch to "unavailability" >> > >>>>> >> >> > >>>>> >> >> > >>>>> >> -- About "reserve" vs "overdraft" >> > >>>>> >> >> > >>>>> >> As Fanrui mentioned above, perhaps, the best overdraft size >> will >> > be >> > >>>>> >> equal to parallelism. Also, the user can set any value he >> wants. >> > So >> > >>>>> even >> > >>>>> >> if parallelism is small(~5) but the user's flatmap produces a >> lot >> > of >> > >>>>> >> data, the user can set 10 or even more. Which almost double the >> > max >> > >>>>> >> buffers and it will be impossible to reserve. At least we need >> to >> > >>>>> figure >> > >>>>> >> out how to protect from such cases (the limit for an >> overdraft?). >> > So >> > >>>>> >> actually it looks even more difficult than increasing the >> maximum >> > >>>>> buffers. >> > >>>>> >> >> > >>>>> >> I want to emphasize that overdraft buffers are soft >> configuration >> > >>>>> which >> > >>>>> >> means it takes as many buffers as the global buffers pool has >> > >>>>> >> available(maybe zero) but less than this configured value. It >> is >> > >>>>> also >> > >>>>> >> important to notice that perhaps, not many subtasks in >> TaskManager >> > >>>>> will >> > >>>>> >> be using this feature so we don't actually need a lot of >> available >> > >>>>> >> buffers for every subtask(Here, I mean that if we have only one >> > >>>>> >> window/flatmap operator and many other operators, then one >> > >>>>> TaskManager >> > >>>>> >> will have many ordinary subtasks which don't actually need >> > >>>>> overdraft and >> > >>>>> >> several subtasks that needs this feature). But in case of >> > >>>>> reservation, >> > >>>>> >> we will reserve some buffers for all operators even if they >> don't >> > >>>>> really >> > >>>>> >> need it. >> > >>>>> >> >> > >>>>> >> >> > >>>>> >> -- Legacy source problem >> > >>>>> >> >> > >>>>> >> If we still want to change max buffers then it is problem for >> > >>>>> >> LegacySources(since every subtask of source will always use >> these >> > >>>>> >> overdraft). But right now, I think that we can force to set 0 >> > >>>>> overdraft >> > >>>>> >> buffers for legacy subtasks in configuration during >> execution(if >> > it >> > >>>>> is >> > >>>>> >> not too late for changing configuration in this place). >> > >>>>> >> >> > >>>>> >> >> > >>>>> >> 03.05.2022 14:11, rui fan пишет: >> > >>>>> >>> Hi >> > >>>>> >>> >> > >>>>> >>> Thanks for Martijn Visser and Piotrek's feedback. I agree >> with >> > >>>>> >>> ignoring the legacy source, it will affect our design. User >> > should >> > >>>>> >>> use the new Source Api as much as possible. >> > >>>>> >>> >> > >>>>> >>> Hi Piotrek, we may still need to discuss whether the >> > >>>>> >>> overdraft/reserve/spare should use extra buffers or buffers >> > >>>>> >>> in (exclusive + floating buffers)? They have some differences. >> > >>>>> >>> >> > >>>>> >>> If it uses extra buffers: >> > >>>>> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1 >> > >>>>> >>> <= currentPoolSize) and all subpartitions don't reach the >> > >>>>> >>> maxBuffersPerChannel. >> > >>>>> >>> >> > >>>>> >>> If it uses the buffers in (exclusive + floating buffers): >> > >>>>> >>> 1. The LocalBufferPool will be available when (usedBuffers + >> > >>>>> >>> overdraftBuffers <= currentPoolSize) and all subpartitions >> > >>>>> >>> don't reach the maxBuffersPerChannel. >> > >>>>> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), >> > the >> > >>>>> >>> usedBuffers will be small. That is the LocalBufferPool will be >> > >>>>> >>> easily unavailable. For throughput, if users turn up the >> > >>>>> >>> overdraft buffers, they need to turn up exclusive or floating >> > >>>>> >>> buffers. It also affects the InputChannel, and it's is >> unfriendly >> > >>>>> >>> to users. >> > >>>>> >>> >> > >>>>> >>> So I prefer the overdraft to use extra buffers. >> > >>>>> >>> >> > >>>>> >>> >> > >>>>> >>> BTW, for watermark, the number of buffers it needs is >> > >>>>> >>> numberOfSubpartitions. So if >> > >>>>> overdraftBuffers=numberOfSubpartitions, >> > >>>>> >>> the watermark won't block in requestMemory. But it has >> > >>>>> >>> 2 problems: >> > >>>>> >>> 1. It needs more overdraft buffers. If the overdraft uses >> > >>>>> >>> (exclusive + floating buffers), there will be fewer buffers >> > >>>>> >>> available. Throughput may be affected. >> > >>>>> >>> 2. The numberOfSubpartitions is different for each Task. >> > >>>>> >>> So if users want to cover watermark using this feature, >> > >>>>> >>> they don't know how to set the overdraftBuffers more r >> > >>>>> >>> easonably. And if the parallelism is changed, users still >> > >>>>> >>> need to change overdraftBuffers. It is unfriendly to users. >> > >>>>> >>> >> > >>>>> >>> So I propose we support overdraftBuffers=-1, It means >> > >>>>> >>> we will automatically set >> overdraftBuffers=numberOfSubpartitions >> > >>>>> >>> in the Constructor of LocalBufferPool. >> > >>>>> >>> >> > >>>>> >>> Please correct me if I'm wrong. >> > >>>>> >>> >> > >>>>> >>> Thanks >> > >>>>> >>> fanrui >> > >>>>> >>> >> > >>>>> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski < >> > >>>>> pnowoj...@apache.org> >> > >>>>> >> wrote: >> > >>>>> >>>> Hi fanrui, >> > >>>>> >>>> >> > >>>>> >>>>> Do you mean don't add the extra buffers? We just use >> (exclusive >> > >>>>> >> buffers * >> > >>>>> >>>>> parallelism + floating buffers)? The LocalBufferPool will be >> > >>>>> available >> > >>>>> >>>> when >> > >>>>> >>>>> (usedBuffers+overdraftBuffers <= >> > >>>>> >>>> exclusiveBuffers*parallelism+floatingBuffers) >> > >>>>> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, >> > >>>>> right? >> > >>>>> >>>> I'm not sure. Definitely we would need to adjust the minimum >> > >>>>> number of >> > >>>>> >> the >> > >>>>> >>>> required buffers, just as we did when we were implementing >> the >> > non >> > >>>>> >> blocking >> > >>>>> >>>> outputs and adding availability logic to LocalBufferPool. >> Back >> > >>>>> then we >> > >>>>> >>>> added "+ 1" to the minimum number of buffers. Currently this >> > >>>>> logic is >> > >>>>> >>>> located >> > >>>>> NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition: >> > >>>>> >>>> >> > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : >> > >>>>> numSubpartitions + 1; >> > >>>>> >>>> For performance reasons, we always require at least one >> buffer >> > per >> > >>>>> >>>> sub-partition. Otherwise performance falls drastically. Now >> if >> > we >> > >>>>> >> require 5 >> > >>>>> >>>> overdraft buffers for output to be available, we need to have >> > >>>>> them on >> > >>>>> >> top >> > >>>>> >>>> of those "one buffer per sub-partition". So the logic should >> be >> > >>>>> changed >> > >>>>> >> to: >> > >>>>> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : >> > >>>>> numSubpartitions + >> > >>>>> >>>> numOverdraftBuffers; >> > >>>>> >>>> >> > >>>>> >>>> Regarding increasing the number of max buffers I'm not sure. >> As >> > >>>>> long as >> > >>>>> >>>> "overdraft << max number of buffers", because all buffers on >> the >> > >>>>> outputs >> > >>>>> >>>> are shared across all sub-partitions. If we have 5 overdraft >> > >>>>> buffers, >> > >>>>> >> and >> > >>>>> >>>> parallelism of 100, it doesn't matter in the grand scheme of >> > >>>>> things if >> > >>>>> >> we >> > >>>>> >>>> make the output available if at least one single buffer is >> > >>>>> available or >> > >>>>> >> at >> > >>>>> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So >> > >>>>> effects of >> > >>>>> >>>> increasing the overdraft from 1 to for example 5 should be >> > >>>>> negligible. >> > >>>>> >> For >> > >>>>> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 >> > still >> > >>>>> >> increases >> > >>>>> >>>> the overdraft by only about 25%. So maybe we can keep the >> max as >> > >>>>> it is? >> > >>>>> >>>> >> > >>>>> >>>> If so, maybe we should change the name from "overdraft" to >> > "buffer >> > >>>>> >> reserve" >> > >>>>> >>>> or "spare buffers"? And document it as "number of buffers >> kept >> > in >> > >>>>> >> reserve >> > >>>>> >>>> in case of flatMap/firing timers/huge records"? >> > >>>>> >>>> >> > >>>>> >>>> What do you think Fenrui, Anton? >> > >>>>> >>>> >> > >>>>> >>>> Re LegacySources. I agree we can kind of ignore them in the >> new >> > >>>>> >> features, >> > >>>>> >>>> as long as we don't brake the existing deployments too much. >> > >>>>> >>>> >> > >>>>> >>>> Best, >> > >>>>> >>>> Piotrek >> > >>>>> >>>> >> > >>>>> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser < >> mart...@ververica.com> >> > >>>>> >> napisał(a): >> > >>>>> >>>>> Hi everyone, >> > >>>>> >>>>> >> > >>>>> >>>>> Just wanted to chip in on the discussion of legacy sources: >> > >>>>> IMHO, we >> > >>>>> >>>> should >> > >>>>> >>>>> not focus too much on improving/adding capabilities for >> legacy >> > >>>>> sources. >> > >>>>> >>>> We >> > >>>>> >>>>> want to persuade and push users to use the new Source API. >> Yes, >> > >>>>> this >> > >>>>> >>>> means >> > >>>>> >>>>> that there's work required by the end users to port any >> custom >> > >>>>> source >> > >>>>> >> to >> > >>>>> >>>>> the new interface. The benefits of the new Source API should >> > >>>>> outweigh >> > >>>>> >>>> this. >> > >>>>> >>>>> Anything that we build to support multiple interfaces means >> > >>>>> adding more >> > >>>>> >>>>> complexity and more possibilities for bugs. Let's try to >> make >> > our >> > >>>>> >> lives a >> > >>>>> >>>>> little bit easier. >> > >>>>> >>>>> >> > >>>>> >>>>> Best regards, >> > >>>>> >>>>> >> > >>>>> >>>>> Martijn Visser >> > >>>>> >>>>> https://twitter.com/MartijnVisser82 >> > >>>>> >>>>> https://github.com/MartijnVisser >> > >>>>> >>>>> >> > >>>>> >>>>> >> > >>>>> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> >> > >>>>> wrote: >> > >>>>> >>>>> >> > >>>>> >>>>>> Hi Piotrek >> > >>>>> >>>>>> >> > >>>>> >>>>>>> Do you mean to ignore it while processing records, but >> keep >> > >>>>> using >> > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability >> of >> > the >> > >>>>> >>>> output? >> > >>>>> >>>>>> I think yes, and please Anton Kalashnikov to help double >> > check. >> > >>>>> >>>>>> >> > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is >> it a >> > >>>>> big >> > >>>>> >>>>> problem >> > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we >> > already >> > >>>>> have >> > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer. >> > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a >> > single >> > >>>>> >>>> buffer >> > >>>>> >>>>>>> available and this works the same for all tasks (including >> > >>>>> legacy >> > >>>>> >>>>> source >> > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check >> if >> > >>>>> at least >> > >>>>> >>>>>>> "overdraft number of buffers are available", where >> "overdraft >> > >>>>> number" >> > >>>>> >>>>> is >> > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of >> > "1"? >> > >>>>> >>>>>> Do you mean don't add the extra buffers? We just use >> > (exclusive >> > >>>>> >>>> buffers * >> > >>>>> >>>>>> parallelism + floating buffers)? The LocalBufferPool will >> be >> > >>>>> available >> > >>>>> >>>>> when >> > >>>>> >>>>>> (usedBuffers+overdraftBuffers <= >> > >>>>> >>>>>> exclusiveBuffers*parallelism+floatingBuffers) >> > >>>>> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, >> > >>>>> right? >> > >>>>> >>>>>> >> > >>>>> >>>>>> If yes, I think it can solve the problem of legacy source. >> > >>>>> There may >> > >>>>> >> be >> > >>>>> >>>>>> some impact. If overdraftBuffers is large and only one >> buffer >> > >>>>> is used >> > >>>>> >>>> to >> > >>>>> >>>>>> process a single record, exclusive buffers*parallelism + >> > >>>>> floating >> > >>>>> >>>> buffers >> > >>>>> >>>>>> cannot be used. It may only be possible to use (exclusive >> > >>>>> buffers * >> > >>>>> >>>>>> parallelism >> > >>>>> >>>>>> + floating buffers - overdraft buffers + 1). For >> throughput, >> > if >> > >>>>> turn >> > >>>>> >> up >> > >>>>> >>>>> the >> > >>>>> >>>>>> overdraft buffers, the flink user needs to turn up >> exclusive >> > or >> > >>>>> >>>> floating >> > >>>>> >>>>>> buffers. And it also affects the InputChannel. >> > >>>>> >>>>>> >> > >>>>> >>>>>> If not, I don't think it can solve the problem of legacy >> > >>>>> source. The >> > >>>>> >>>>> legacy >> > >>>>> >>>>>> source don't check isAvailable, If there are the extra >> > buffers, >> > >>>>> legacy >> > >>>>> >>>>>> source >> > >>>>> >>>>>> will use them up until block in requestMemory. >> > >>>>> >>>>>> >> > >>>>> >>>>>> >> > >>>>> >>>>>> Thanks >> > >>>>> >>>>>> fanrui >> > >>>>> >>>>>> >> > >>>>> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski < >> > >>>>> pnowoj...@apache.org> >> > >>>>> >>>>>> wrote: >> > >>>>> >>>>>> >> > >>>>> >>>>>>> Hi, >> > >>>>> >>>>>>> >> > >>>>> >>>>>>> +1 for the general proposal from my side. It would be a >> nice >> > >>>>> >>>> workaround >> > >>>>> >>>>>>> flatMaps, WindowOperators and large records issues with >> > >>>>> unaligned >> > >>>>> >>>>>>> checkpoints. >> > >>>>> >>>>>>> >> > >>>>> >>>>>>>> The first task is about ignoring max buffers per channel. >> > This >> > >>>>> >>>> means >> > >>>>> >>>>> if >> > >>>>> >>>>>>>> we request a memory segment from LocalBufferPool and the >> > >>>>> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just >> > >>>>> ignore >> > >>>>> >>>> that >> > >>>>> >>>>>>>> and continue to allocate buffer while LocalBufferPool has >> > >>>>> it(it is >> > >>>>> >>>>>>>> actually not a overdraft). >> > >>>>> >>>>>>> Do you mean to ignore it while processing records, but >> keep >> > >>>>> using >> > >>>>> >>>>>>> `maxBuffersPerChannel` when calculating the availability >> of >> > the >> > >>>>> >>>> output? >> > >>>>> >>>>>>>> The second task is about the real overdraft. I am pretty >> > >>>>> convinced >> > >>>>> >>>>> now >> > >>>>> >>>>>>>> that we, unfortunately, need configuration for >> limitation of >> > >>>>> >>>>> overdraft >> > >>>>> >>>>>>>> number(because it is not ok if one subtask allocates all >> > >>>>> buffers of >> > >>>>> >>>>> one >> > >>>>> >>>>>>>> TaskManager considering that several different jobs can >> be >> > >>>>> >>>> submitted >> > >>>>> >>>>> on >> > >>>>> >>>>>>>> this TaskManager). So idea is to have >> > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >> > >>>>> >>>>>> LocalBufferPool). >> > >>>>> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool >> is >> > >>>>> >>>> reached, >> > >>>>> >>>>>>>> LocalBufferPool can request additionally from >> > >>>>> NetworkBufferPool up >> > >>>>> >>>> to >> > >>>>> >>>>>>>> maxOverdraftBuffersPerPartition buffers. >> > >>>>> >>>>>>> +1 for just having this as a separate configuration. Is >> it a >> > >>>>> big >> > >>>>> >>>>> problem >> > >>>>> >>>>>>> that legacy sources would be ignoring it? Note that we >> > already >> > >>>>> have >> > >>>>> >>>>>>> effectively hardcoded a single overdraft buffer. >> > >>>>> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a >> > single >> > >>>>> >>>> buffer >> > >>>>> >>>>>>> available and this works the same for all tasks (including >> > >>>>> legacy >> > >>>>> >>>>> source >> > >>>>> >>>>>>> tasks). Would it be a big issue if we changed it to check >> if >> > >>>>> at least >> > >>>>> >>>>>>> "overdraft number of buffers are available", where >> "overdraft >> > >>>>> number" >> > >>>>> >>>>> is >> > >>>>> >>>>>>> configurable, instead of the currently hardcoded value of >> > "1"? >> > >>>>> >>>>>>> >> > >>>>> >>>>>>> Best, >> > >>>>> >>>>>>> Piotrek >> > >>>>> >>>>>>> >> > >>>>> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> >> > >>>>> napisał(a): >> > >>>>> >>>>>>> >> > >>>>> >>>>>>>> Let me add some information about the LegacySource. >> > >>>>> >>>>>>>> >> > >>>>> >>>>>>>> If we want to disable the overdraft buffer for >> LegacySource. >> > >>>>> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool? >> > >>>>> >>>>>>>> The default value is false. If the getAvailableFuture is >> > >>>>> called, >> > >>>>> >>>>>>>> change enableOverdraft=true. It indicates whether there >> are >> > >>>>> >>>>>>>> checks isAvailable elsewhere. >> > >>>>> >>>>>>>> >> > >>>>> >>>>>>>> I don't think it is elegant, but it's safe. Please >> correct >> > me >> > >>>>> if >> > >>>>> >>>> I'm >> > >>>>> >>>>>>> wrong. >> > >>>>> >>>>>>>> Thanks >> > >>>>> >>>>>>>> fanrui >> > >>>>> >>>>>>>> >> > >>>>> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan < >> > >>>>> 1996fan...@gmail.com> >> > >>>>> >>>>> wrote: >> > >>>>> >>>>>>>>> Hi, >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>> Thanks for your quick response. >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>> For question 1/2/3, we think they are clear. We just >> need >> > to >> > >>>>> >>>>> discuss >> > >>>>> >>>>>>> the >> > >>>>> >>>>>>>>> default value in PR. >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>> For the legacy source, you are right. It's difficult for >> > >>>>> general >> > >>>>> >>>>>>>>> implementation. >> > >>>>> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() >> in >> > >>>>> >>>>>>>>> SourceFunction.SourceContext. And call it in our common >> > >>>>> >>>>> LegacySource, >> > >>>>> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs >> consume >> > >>>>> >>>> kafka, >> > >>>>> >>>>> so >> > >>>>> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems. >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>> Core code: >> > >>>>> >>>>>>>>> ``` >> > >>>>> >>>>>>>>> public void ensureRecordWriterIsAvailable() { >> > >>>>> >>>>>>>>> if (recordWriter == null >> > >>>>> >>>>>>>>> || >> > >>>>> >>>>>>>>> >> > >>>>> >> >> > >>>>> >> > >> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, >> > >>>>> >>>>>>>>> false) >> > >>>>> >>>>>>>>> || recordWriter.isAvailable()) { >> > >>>>> >>>>>>>>> return; >> > >>>>> >>>>>>>>> } >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>> CompletableFuture<?> resumeFuture = >> > >>>>> >>>>>>>> recordWriter.getAvailableFuture(); >> > >>>>> >>>>>>>>> try { >> > >>>>> >>>>>>>>> resumeFuture.get(); >> > >>>>> >>>>>>>>> } catch (Throwable ignored) { >> > >>>>> >>>>>>>>> } >> > >>>>> >>>>>>>>> } >> > >>>>> >>>>>>>>> ``` >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>> LegacySource calls >> > >>>>> sourceContext.ensureRecordWriterIsAvailable() >> > >>>>> >>>>>>>>> before synchronized (checkpointLock) and collects >> records. >> > >>>>> >>>>>>>>> Please let me know if there is a better solution. >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>> Thanks >> > >>>>> >>>>>>>>> fanrui >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov < >> > >>>>> >>>>>> kaa....@yandex.com> >> > >>>>> >>>>>>>>> wrote: >> > >>>>> >>>>>>>>> >> > >>>>> >>>>>>>>>> Hi. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs >> or >> > >>>>> two >> > >>>>> >>>>>> commits >> > >>>>> >>>>>>>> in a >> > >>>>> >>>>>>>>>> PR? >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> Perhaps, the separated ticket will be better since this >> > >>>>> task has >> > >>>>> >>>>>> fewer >> > >>>>> >>>>>>>>>> questions but we should find a solution for >> LegacySource >> > >>>>> first. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> -- 2. For the first task, if the flink user disables >> the >> > >>>>> >>>>> Unaligned >> > >>>>> >>>>>>>>>> Checkpoint, do we ignore max buffers per channel? >> > >>>>> Because >> > >>>>> >>>> the >> > >>>>> >>>>>>>>>> overdraft >> > >>>>> >>>>>>>>>> isn't useful for the Aligned Checkpoint, it still >> > >>>>> needs to >> > >>>>> >>>>> wait >> > >>>>> >>>>>>> for >> > >>>>> >>>>>>>>>> downstream Task to consume. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> I think that the logic should be the same for AC and >> UC. >> > As >> > >>>>> I >> > >>>>> >>>>>>>> understand, >> > >>>>> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it >> > >>>>> doesn't >> > >>>>> >>>>> make >> > >>>>> >>>>>>> it >> > >>>>> >>>>>>>>>> worse as well. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> 3. For the second task >> > >>>>> >>>>>>>>>> -- - The default value of >> > >>>>> maxOverdraftBuffersPerPartition >> > >>>>> >>>> may >> > >>>>> >>>>>>> also >> > >>>>> >>>>>>>>>> need >> > >>>>> >>>>>>>>>> to be discussed. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> I think it should be a pretty small value or even 0 >> since >> > it >> > >>>>> >>>> kind >> > >>>>> >>>>> of >> > >>>>> >>>>>>>>>> optimization and user should understand what they >> > >>>>> do(especially >> > >>>>> >>>> if >> > >>>>> >>>>>> we >> > >>>>> >>>>>>>>>> implement the first task). >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> -- - If the user disables the Unaligned >> Checkpoint, >> > >>>>> can we >> > >>>>> >>>>> set >> > >>>>> >>>>>>> the >> > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >> > >>>>> overdraft >> > >>>>> >>>>>> isn't >> > >>>>> >>>>>>>>>> useful for >> > >>>>> >>>>>>>>>> the Aligned Checkpoint. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> The same answer that above, if the overdraft doesn't >> make >> > >>>>> >>>>>> degradation >> > >>>>> >>>>>>>> for >> > >>>>> >>>>>>>>>> the Aligned Checkpoint I don't think that we should >> make >> > >>>>> >>>>> difference >> > >>>>> >>>>>>>> between >> > >>>>> >>>>>>>>>> AC and UC. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> 4. For the legacy source >> > >>>>> >>>>>>>>>> -- - If enabling the Unaligned Checkpoint, it >> uses up >> > >>>>> to >> > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >> > >>>>> >>>>>>>>>> - If disabling the UC, it doesn't use the >> > overdraft >> > >>>>> >>>> buffer. >> > >>>>> >>>>>>>>>> - Do you think it's ok? >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> Ideally, I don't want to use overdraft for >> LegacySource at >> > >>>>> all >> > >>>>> >>>>> since >> > >>>>> >>>>>>> it >> > >>>>> >>>>>>>>>> can lead to undesirable results especially if the >> limit is >> > >>>>> high. >> > >>>>> >>>>> At >> > >>>>> >>>>>>>> least, >> > >>>>> >>>>>>>>>> as I understand, it will always work in overdraft mode >> and >> > >>>>> it >> > >>>>> >>>> will >> > >>>>> >>>>>>>> borrow >> > >>>>> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global >> > pool >> > >>>>> >>>> which >> > >>>>> >>>>>> can >> > >>>>> >>>>>>>> lead >> > >>>>> >>>>>>>>>> to degradation of other subtasks on the same >> TaskManager. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> -- - Actually, we added the checkAvailable logic >> for >> > >>>>> >>>>>> LegacySource >> > >>>>> >>>>>>>> in >> > >>>>> >>>>>>>>>> our >> > >>>>> >>>>>>>>>> internal version. It works well. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> I don't really understand how it is possible for >> general >> > >>>>> case >> > >>>>> >>>>>>>> considering >> > >>>>> >>>>>>>>>> that each user has their own implementation of >> > >>>>> >>>>> LegacySourceOperator >> > >>>>> >>>>>>>>>> -- 5. For the benchmark, do you have any >> suggestions? I >> > >>>>> >>>>> submitted >> > >>>>> >>>>>>> the >> > >>>>> >>>>>>>> PR >> > >>>>> >>>>>>>>>> [1]. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon. >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет: >> > >>>>> >>>>>>>>>>> Hi, >> > >>>>> >>>>>>>>>>> >> > >>>>> >>>>>>>>>>> Thanks for your feedback. I have a servel of >> questions. >> > >>>>> >>>>>>>>>>> >> > >>>>> >>>>>>>>>>> 1. Do you mean split this into two JIRAs or two >> PRs >> > >>>>> or two >> > >>>>> >>>>>>> commits >> > >>>>> >>>>>>>>>> in a >> > >>>>> >>>>>>>>>>> PR? >> > >>>>> >>>>>>>>>>> 2. For the first task, if the flink user >> disables >> > the >> > >>>>> >>>>>> Unaligned >> > >>>>> >>>>>>>>>>> Checkpoint, do we ignore max buffers per >> channel? >> > >>>>> Because >> > >>>>> >>>>> the >> > >>>>> >>>>>>>>>> overdraft >> > >>>>> >>>>>>>>>>> isn't useful for the Aligned Checkpoint, it >> still >> > >>>>> needs to >> > >>>>> >>>>>> wait >> > >>>>> >>>>>>>> for >> > >>>>> >>>>>>>>>>> downstream Task to consume. >> > >>>>> >>>>>>>>>>> 3. For the second task >> > >>>>> >>>>>>>>>>> - The default value of >> > >>>>> maxOverdraftBuffersPerPartition >> > >>>>> >>>>> may >> > >>>>> >>>>>>> also >> > >>>>> >>>>>>>>>> need >> > >>>>> >>>>>>>>>>> to be discussed. >> > >>>>> >>>>>>>>>>> - If the user disables the Unaligned >> Checkpoint, >> > >>>>> can we >> > >>>>> >>>>> set >> > >>>>> >>>>>>> the >> > >>>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because >> the >> > >>>>> >>>> overdraft >> > >>>>> >>>>>>> isn't >> > >>>>> >>>>>>>>>> useful for >> > >>>>> >>>>>>>>>>> the Aligned Checkpoint. >> > >>>>> >>>>>>>>>>> 4. For the legacy source >> > >>>>> >>>>>>>>>>> - If enabling the Unaligned Checkpoint, it >> uses >> > >>>>> up to >> > >>>>> >>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >> > >>>>> >>>>>>>>>>> - If disabling the UC, it doesn't use the >> > >>>>> overdraft >> > >>>>> >>>>> buffer. >> > >>>>> >>>>>>>>>>> - Do you think it's ok? >> > >>>>> >>>>>>>>>>> - Actually, we added the checkAvailable logic >> > for >> > >>>>> >>>>>>> LegacySource >> > >>>>> >>>>>>>>>> in our >> > >>>>> >>>>>>>>>>> internal version. It works well. >> > >>>>> >>>>>>>>>>> 5. For the benchmark, do you have any >> suggestions? >> > I >> > >>>>> >>>>> submitted >> > >>>>> >>>>>>> the >> > >>>>> >>>>>>>>>> PR >> > >>>>> >>>>>>>>>>> [1]. >> > >>>>> >>>>>>>>>>> >> > >>>>> >>>>>>>>>>> [1] >> https://github.com/apache/flink-benchmarks/pull/54 >> > >>>>> >>>>>>>>>>> >> > >>>>> >>>>>>>>>>> Thanks >> > >>>>> >>>>>>>>>>> fanrui >> > >>>>> >>>>>>>>>>> >> > >>>>> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov < >> > >>>>> >>>>>>> kaa....@yandex.com >> > >>>>> >>>>>>>>>>> wrote: >> > >>>>> >>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> Hi, >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. >> Here >> > >>>>> is >> > >>>>> >>>>> some >> > >>>>> >>>>>>>>>>>> conclusion: >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> First of all, let's split this into two tasks. >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> The first task is about ignoring max buffers per >> > channel. >> > >>>>> >>>> This >> > >>>>> >>>>>>> means >> > >>>>> >>>>>>>> if >> > >>>>> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and >> the >> > >>>>> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we >> > just >> > >>>>> >>>>> ignore >> > >>>>> >>>>>>> that >> > >>>>> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool >> > has >> > >>>>> >>>> it(it >> > >>>>> >>>>>> is >> > >>>>> >>>>>>>>>>>> actually not a overdraft). >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> The second task is about the real overdraft. I am >> pretty >> > >>>>> >>>>>> convinced >> > >>>>> >>>>>>>> now >> > >>>>> >>>>>>>>>>>> that we, unfortunately, need configuration for >> > limitation >> > >>>>> of >> > >>>>> >>>>>>>> overdraft >> > >>>>> >>>>>>>>>>>> number(because it is not ok if one subtask allocates >> all >> > >>>>> >>>>> buffers >> > >>>>> >>>>>> of >> > >>>>> >>>>>>>> one >> > >>>>> >>>>>>>>>>>> TaskManager considering that several different jobs >> can >> > be >> > >>>>> >>>>>>> submitted >> > >>>>> >>>>>>>> on >> > >>>>> >>>>>>>>>>>> this TaskManager). So idea is to have >> > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say >> per >> > >>>>> >>>>>>>>>> LocalBufferPool). >> > >>>>> >>>>>>>>>>>> In this case, when a limit of buffers in >> LocalBufferPool >> > >>>>> is >> > >>>>> >>>>>>> reached, >> > >>>>> >>>>>>>>>>>> LocalBufferPool can request additionally from >> > >>>>> >>>> NetworkBufferPool >> > >>>>> >>>>>> up >> > >>>>> >>>>>>> to >> > >>>>> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> But it is still not clear how to handle LegacySource >> > >>>>> since it >> > >>>>> >>>>>>>> actually >> > >>>>> >>>>>>>>>>>> works as unlimited flatmap and it will always work in >> > >>>>> >>>> overdraft >> > >>>>> >>>>>>> mode >> > >>>>> >>>>>>>>>>>> which is not a target. So we still need to think >> about >> > >>>>> that. >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет: >> > >>>>> >>>>>>>>>>>>> Hi Anton Kalashnikov, >> > >>>>> >>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>> I think you agree with we should limit the maximum >> > >>>>> number of >> > >>>>> >>>>>>>> overdraft >> > >>>>> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, >> > right? >> > >>>>> >>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to >> > >>>>> don't >> > >>>>> >>>> add >> > >>>>> >>>>>> the >> > >>>>> >>>>>>>> new >> > >>>>> >>>>>>>>>>>>> configuration. And I hope to hear more from the >> > >>>>> community. >> > >>>>> >>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>> Best wishes >> > >>>>> >>>>>>>>>>>>> fanrui >> > >>>>> >>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan < >> > >>>>> >>>>> 1996fan...@gmail.com> >> > >>>>> >>>>>>>>>> wrote: >> > >>>>> >>>>>>>>>>>>>> Hi Anton Kalashnikov, >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are >> > >>>>> totally >> > >>>>> >>>>>> right. >> > >>>>> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be >> > used >> > >>>>> as >> > >>>>> >>>>> the >> > >>>>> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer >> > >>>>> >>>>>>> configuration.Flink >> > >>>>> >>>>>>>>>> users >> > >>>>> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the >> > >>>>> overdraft >> > >>>>> >>>>>> buffer >> > >>>>> >>>>>>>>>> size. >> > >>>>> >>>>>>>>>>>>>> Also, I‘d like to add some information. For >> safety, we >> > >>>>> >>>> should >> > >>>>> >>>>>>> limit >> > >>>>> >>>>>>>>>> the >> > >>>>> >>>>>>>>>>>>>> maximum number of overdraft segments that each >> > >>>>> >>>>> LocalBufferPool >> > >>>>> >>>>>>>>>>>>>> can apply for. >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>> Why do we limit it? >> > >>>>> >>>>>>>>>>>>>> Some operators don't check the >> > >>>>> `recordWriter.isAvailable` >> > >>>>> >>>>>> during >> > >>>>> >>>>>>>>>>>>>> processing records, such as LegacySource. I have >> > >>>>> mentioned >> > >>>>> >>>> it >> > >>>>> >>>>>> in >> > >>>>> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other >> > cases. >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will >> use >> > >>>>> up >> > >>>>> >>>> all >> > >>>>> >>>>>>>>>> remaining >> > >>>>> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the >> backpressure >> > is >> > >>>>> >>>>>> severe. >> > >>>>> >>>>>>>>>>>>>> How to limit it? >> > >>>>> >>>>>>>>>>>>>> I prefer to hard code the >> > >>>>> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions` >> > >>>>> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The >> > >>>>> >>>>> maxOverdraftBuffers >> > >>>>> >>>>>> is >> > >>>>> >>>>>>>>>> just >> > >>>>> >>>>>>>>>>>>>> for safety, and it should be enough for most flink >> > >>>>> jobs. Or >> > >>>>> >>>>> we >> > >>>>> >>>>>>> can >> > >>>>> >>>>>>>>>> set >> > >>>>> >>>>>>>>>>>>>> >> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, >> > >>>>> 10)` >> > >>>>> >>>> to >> > >>>>> >>>>>>> handle >> > >>>>> >>>>>>>>>>>>>> some jobs of low parallelism. >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>> Also if user don't enable the Unaligned >> Checkpoint, we >> > >>>>> can >> > >>>>> >>>>> set >> > >>>>> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of >> > >>>>> >>>> LocalBufferPool. >> > >>>>> >>>>>>>> Because >> > >>>>> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned >> Checkpoint. >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot. >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>> [1] >> https://issues.apache.org/jira/browse/FLINK-26759 >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>> Best wishes >> > >>>>> >>>>>>>>>>>>>> fanrui >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov >> < >> > >>>>> >>>>>>>>>> kaa....@yandex.com> >> > >>>>> >>>>>>>>>>>>>> wrote: >> > >>>>> >>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> Hi fanrui, >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> Thanks for creating the FLIP. >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea >> and it >> > >>>>> >>>> should >> > >>>>> >>>>>>> help >> > >>>>> >>>>>>>> in >> > >>>>> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about >> > >>>>> >>>>>> configuration: >> > >>>>> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I >> understand >> > >>>>> right >> > >>>>> >>>>> now >> > >>>>> >>>>>>> we >> > >>>>> >>>>>>>>>> have >> > >>>>> >>>>>>>>>>>>>>> following calculation. >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network >> > >>>>> >>>>> memory(calculated >> > >>>>> >>>>>>> via >> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction, >> > >>>>> >>>>>>>> taskmanager.memory.network.min, >> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory >> > size) / >> > >>>>> >>>>>>>>>>>>>>> taskmanager.memory.segment-size. >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = >> (exclusive >> > >>>>> >>>> buffers >> > >>>>> >>>>> * >> > >>>>> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number >> in >> > >>>>> >>>>>> TaskManager >> > >>>>> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which >> > used >> > >>>>> at >> > >>>>> >>>>>>> current >> > >>>>> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber) >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to >> > >>>>> >>>>>>> maxBuffersNumber >> > >>>>> >>>>>>>>>> which >> > >>>>> >>>>>>>>>>>>>>> allows Flink work predictibly. But if >> > >>>>> >>>> requiredBuffersNumber >> > >>>>> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not >> > >>>>> good) >> > >>>>> >>>>> since >> > >>>>> >>>>>>> not >> > >>>>> >>>>>>>>>> all >> > >>>>> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if >> > >>>>> Flink >> > >>>>> >>>> can >> > >>>>> >>>>>> not >> > >>>>> >>>>>>>>>>>>>>> allocate floating buffers) >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, >> as I >> > >>>>> >>>>>> understand >> > >>>>> >>>>>>>>>> Flink >> > >>>>> >>>>>>>>>>>>>>> just never use these leftovers >> > >>>>> buffers(maxBuffersNumber - >> > >>>>> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( >> we >> > >>>>> can >> > >>>>> >>>>>> actualy >> > >>>>> >>>>>>>> use >> > >>>>> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber - >> > >>>>> >>>> buffersInUseNumber' >> > >>>>> >>>>>>> since >> > >>>>> >>>>>>>>>> if >> > >>>>> >>>>>>>>>>>>>>> one TaskManager contains several operators >> including >> > >>>>> >>>>> 'window' >> > >>>>> >>>>>>>> which >> > >>>>> >>>>>>>>>> can >> > >>>>> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool). >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to >> > >>>>> >>>> requesting >> > >>>>> >>>>>>>> buffers >> > >>>>> >>>>>>>>>>>>>>> during processing single record while switching to >> > >>>>> >>>>>> unavalability >> > >>>>> >>>>>>>>>>>> between >> > >>>>> >>>>>>>>>>>>>>> records should be the same as we have it now): >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> * If one more buffer requested but >> > maxBuffersPerChannel >> > >>>>> >>>>>> reached, >> > >>>>> >>>>>>>>>> then >> > >>>>> >>>>>>>>>>>>>>> just ignore this limitation and allocate this >> buffers >> > >>>>> from >> > >>>>> >>>>> any >> > >>>>> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet >> > >>>>> >>>> otherwise >> > >>>>> >>>>>>> from >> > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool) >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then >> temporally >> > >>>>> >>>> allocate >> > >>>>> >>>>>> it >> > >>>>> >>>>>>>> from >> > >>>>> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to >> allocate >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't >> > work, >> > >>>>> >>>> but I >> > >>>>> >>>>>>> like >> > >>>>> >>>>>>>> it >> > >>>>> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch >> > without >> > >>>>> >>>> any >> > >>>>> >>>>>>>>>>>>>>> configuration, on the other hand, it can be >> > >>>>> configuration >> > >>>>> >>>> by >> > >>>>> >>>>>>>>>> changing >> > >>>>> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and >> > >>>>> requiredBuffersNumber. >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really >> > want >> > >>>>> to >> > >>>>> >>>>>>>> implement >> > >>>>> >>>>>>>>>> new >> > >>>>> >>>>>>>>>>>>>>> configuration since even now it is not clear how >> to >> > >>>>> >>>>> correctly >> > >>>>> >>>>>>>>>> configure >> > >>>>> >>>>>>>>>>>>>>> network buffers with existing configuration and I >> > don't >> > >>>>> >>>> want >> > >>>>> >>>>>> to >> > >>>>> >>>>>>>>>>>>>>> complicate it, especially if it will be possible >> to >> > >>>>> >>>> resolve >> > >>>>> >>>>>> the >> > >>>>> >>>>>>>>>> problem >> > >>>>> >>>>>>>>>>>>>>> automatically(as described above). >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> So is my understanding about network >> memory/buffers >> > >>>>> >>>> correct? >> > >>>>> >>>>>>>>>>>>>>> -- >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> Best regards, >> > >>>>> >>>>>>>>>>>>>>> Anton Kalashnikov >> > >>>>> >>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет: >> > >>>>> >>>>>>>>>>>>>>>> Hi everyone, >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major >> > feature >> > >>>>> of >> > >>>>> >>>>>> Flink. >> > >>>>> >>>>>>>> It >> > >>>>> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint >> timeout >> > >>>>> or >> > >>>>> >>>>> slow >> > >>>>> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe. >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not >> work >> > >>>>> well >> > >>>>> >>>>>> when >> > >>>>> >>>>>>>> the >> > >>>>> >>>>>>>>>>>>>>>> back pressure is severe and multiple output >> buffers >> > >>>>> are >> > >>>>> >>>>>>> required >> > >>>>> >>>>>>>> to >> > >>>>> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also >> > >>>>> mentioned >> > >>>>> >>>>> this >> > >>>>> >>>>>>>> issue >> > >>>>> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to >> solve >> > >>>>> it. >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to >> detail >> > the >> > >>>>> >>>>>>> overdraft >> > >>>>> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton >> > >>>>> >>>> Kalashnikov, >> > >>>>> >>>>>>> there >> > >>>>> >>>>>>>>>> are >> > >>>>> >>>>>>>>>>>>>>>> still some points to discuss: >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>>> * There are already a lot of buffer-related >> > >>>>> >>>>>> configurations. >> > >>>>> >>>>>>>> Do >> > >>>>> >>>>>>>>>> we >> > >>>>> >>>>>>>>>>>>>>>> need to add a new configuration for the >> > >>>>> overdraft >> > >>>>> >>>>>> buffer? >> > >>>>> >>>>>>>>>>>>>>>> * Where should the overdraft buffer use >> > memory? >> > >>>>> >>>>>>>>>>>>>>>> * If the overdraft-buffer uses the memory >> > >>>>> remaining >> > >>>>> >>>> in >> > >>>>> >>>>>> the >> > >>>>> >>>>>>>>>>>>>>>> NetworkBufferPool, no new configuration >> > needs >> > >>>>> to be >> > >>>>> >>>>>>> added. >> > >>>>> >>>>>>>>>>>>>>>> * If adding a new configuration: >> > >>>>> >>>>>>>>>>>>>>>> o Should we set the >> overdraft-memory-size >> > >>>>> at the >> > >>>>> >>>> TM >> > >>>>> >>>>>>> level >> > >>>>> >>>>>>>>>> or >> > >>>>> >>>>>>>>>>>> the >> > >>>>> >>>>>>>>>>>>>>>> Task level? >> > >>>>> >>>>>>>>>>>>>>>> o Or set overdraft-buffers to indicate >> the >> > >>>>> number >> > >>>>> >>>>> of >> > >>>>> >>>>>>>>>>>>>>>> memory-segments that can be >> overdrawn. >> > >>>>> >>>>>>>>>>>>>>>> o What is the default value? How to set >> > >>>>> sensible >> > >>>>> >>>>>>>> defaults? >> > >>>>> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified >> it >> > >>>>> using >> > >>>>> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets >> overdraft-buffers >> > >>>>> at >> > >>>>> >>>>> Task >> > >>>>> >>>>>>>> level, >> > >>>>> >>>>>>>>>>>>>>>> and default value is 10. That is: each >> > LocalBufferPool >> > >>>>> >>>> can >> > >>>>> >>>>>>>>>> overdraw up >> > >>>>> >>>>>>>>>>>>>>>> to 10 memory-segments. >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>>> Looking forward to your feedback! >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>>> Thanks, >> > >>>>> >>>>>>>>>>>>>>>> fanrui >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>>>>>> [1] >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >> >> > >>>>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints >> > >>>>> >>>>>>>>>>>>>>>> [2] >> > https://issues.apache.org/jira/browse/FLINK-14396 >> > >>>>> >>>>>>>>>>>>>>>> [3] >> > https://issues.apache.org/jira/browse/FLINK-26762 >> > >>>>> >>>>>>>>>>>>>>>> [4] >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >> >> > >>>>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >> > >>>>> >>>>>>>>>>>>>>>> [5] >> > >>>>> >>>>>>>>>>>>>>>> >> > >>>>> >> >> > >>>>> >> > >> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe >> > >>>>> >>>>>>>>>>>>>>>> [6] >> > >>>>> https://github.com/apache/flink-benchmarks/pull/54 >> > >>>>> >>>>>>>>>>>> -- >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> Best regards, >> > >>>>> >>>>>>>>>>>> Anton Kalashnikov >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>>>> >> > >>>>> >>>>>>>>>> -- >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> Best regards, >> > >>>>> >>>>>>>>>> Anton Kalashnikov >> > >>>>> >>>>>>>>>> >> > >>>>> >>>>>>>>>> >> > >>>>> >> -- >> > >>>>> >> >> > >>>>> >> Best regards, >> > >>>>> >> Anton Kalashnikov >> > >>>>> >> >> > >>>>> >> >> > >>>>> >> > >>>> >> > >> >