Hi again, After sleeping over this, if both versions (reserve and overdraft) have the same complexity, I would also prefer the overdraft.
> `Integer.MAX_VALUE` as default value was my idea as well but now, as > Dawid mentioned, I think it is dangerous since it is too implicit for > the user and if the user submits one more job for the same TaskManger As I mentioned, it's not only an issue with multiple jobs. The same problem can happen with different subtasks from the same job, potentially leading to the FLINK-13203 deadlock [1]. With FLINK-13203 fixed, I would be in favour of Integer.MAX_VALUE to be the default value, but as it is, I think we should indeed play on the safe side and limit it. > I still don't understand how should be limited "reserve" implementation. > I mean if we have X buffers in total and the user sets overdraft equal > to X we obviously can not reserve all buffers, but how many we are > allowed to reserve? Should it be a different configuration like > percentegeForReservedBuffers? The reserve could be defined as percentage, or as a fixed number of buffers. But yes. In normal operation subtask would not use the reserve, as if numberOfAvailableBuffers < reserve, the output would be not available. Only in the flatMap/timers/huge records case the reserve could be used. > 1. If the total buffers of LocalBufferPool <= the reserve buffers, will LocalBufferPool never be available? Can't process data? Of course we would need to make sure that never happens. So the reserve should be < total buffer size. > 2. If the overdraft buffer use the extra buffers, when the downstream > task inputBuffer is insufficient, it should fail to start the job, and then > restart? When the InputBuffer is initialized, it will apply for enough > buffers, right? The failover if downstream can not allocate buffers is already implemented FLINK-14872 [2]. There is a timeout for how long the task is waiting for buffer allocation. However this doesn't prevent many (potentially infinitely many) deadlock/restarts cycles. IMO the propper solution for [1] would be 2b described in the ticket: > 2b. Assign extra buffers only once all of the tasks are RUNNING. This is a simplified version of 2a, without tracking the tasks sink-to-source. But that's a pre-existing problem and I don't think we have to solve it before implementing overdraft. I think we would need to solve it only before setting Integer.MAX_VALUE as the default for the overdraft. Maybe I would hesitate setting the overdraft to anything more then a couple of buffers by default for the same reason. > Actually, I totally agree that we don't need a lot of buffers for overdraft and > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. > When we finish this feature and after users use it, if users feedback > this issue we can discuss again. +1 Piotrek [1] https://issues.apache.org/jira/browse/FLINK-13203 [2] https://issues.apache.org/jira/browse/FLINK-14872 czw., 5 maj 2022 o 05:52 rui fan <1996fan...@gmail.com> napisał(a): > Hi everyone, > > I still have some questions. > > 1. If the total buffers of LocalBufferPool <= the reserve buffers, will > LocalBufferPool never be available? Can't process data? > 2. If the overdraft buffer use the extra buffers, when the downstream > task inputBuffer is insufficient, it should fail to start the job, and > then > restart? When the InputBuffer is initialized, it will apply for enough > buffers, right? > > Also I agree we ignore the overdraftBuffers=numberOfSubpartitions. > When we finish this feature and after users use it, if users feedback > this issue we can discuss again. > > Thanks > fanrui > > On Wed, May 4, 2022 at 5:29 PM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hey all, >> >> I have not replied in the thread yet, but I was following the discussion. >> >> Personally, I like Fanrui's and Anton's idea. As far as I understand it >> the idea to distinguish between inside flatMap & outside would be fairly >> simple, but maybe slightly indirect. The checkAvailability would remain >> unchanged and it is checked always between separate invocations of the >> UDF. Therefore the overdraft buffers would not apply there. However once >> the pool says it is available, it means it has at least an initial >> buffer. So any additional request without checking for availability can >> be considered to be inside of processing a single record. This does not >> hold just for the LegacySource as I don't think it actually checks for >> the availability of buffers in the LocalBufferPool. >> >> In the offline chat with Anton, we also discussed if we need a limit of >> the number of buffers we could overdraft (or in other words if the limit >> should be equal to Integer.MAX_VALUE), but personally I'd prefer to stay >> on the safe side and have it limited. The pool of network buffers is >> shared for the entire TaskManager, so it means it can be shared even >> across tasks of separate jobs. However, I might be just unnecessarily >> cautious here. >> >> Best, >> >> Dawid >> >> On 04/05/2022 10:54, Piotr Nowojski wrote: >> > Hi, >> > >> > Thanks for the answers. >> > >> >> we may still need to discuss whether the >> >> overdraft/reserve/spare should use extra buffers or buffers >> >> in (exclusive + floating buffers)? >> > and >> > >> >> These things resolve the different problems (at least as I see that). >> >> The current hardcoded "1" says that we switch "availability" to >> >> "unavailability" when one more buffer is left(actually a little less >> >> than one buffer since we write the last piece of data to this last >> >> buffer). The overdraft feature doesn't change this logic we still want >> >> to switch to "unavailability" in such a way but if we are already in >> >> "unavailability" and we want more buffers then we can take "overdraft >> >> number" more. So we can not avoid this hardcoded "1" since we need to >> >> understand when we should switch to "unavailability" >> > Ok, I see. So it seems to me that both of you have in mind to keep the >> > buffer pools as they are right now, but if we are in the middle of >> > processing a record, we can request extra overdraft buffers on top of >> > those? This is another way to implement the overdraft to what I was >> > thinking. I was thinking about something like keeping the "overdraft" or >> > more precisely buffer "reserve" in the buffer pool. I think my version >> > would be easier to implement, because it is just fiddling with min/max >> > buffers calculation and slightly modified `checkAvailability()` logic. >> > >> > On the other hand what you have in mind would better utilise the >> available >> > memory, right? It would require more code changes (how would we know >> when >> > we are allowed to request the overdraft?). However, in this case, I >> would >> > be tempted to set the number of overdraft buffers by default to >> > `Integer.MAX_VALUE`, and let the system request as many buffers as >> > necessary. The only downside that I can think of (apart of higher >> > complexity) would be higher chance of hitting a known/unsolved deadlock >> [1] >> > in a scenario: >> > - downstream task hasn't yet started >> > - upstream task requests overdraft and uses all available memory >> segments >> > from the global pool >> > - upstream task is blocked, because downstream task hasn't started yet >> and >> > can not consume any data >> > - downstream task tries to start, but can not, as there are no available >> > buffers >> > >> >> BTW, for watermark, the number of buffers it needs is >> >> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, >> >> the watermark won't block in requestMemory. >> > and >> > >> >> the best overdraft size will be equal to parallelism. >> > That's a lot of buffers. I don't think we need that many for >> broadcasting >> > watermarks. Watermarks are small, and remember that every subpartition >> has >> > some partially filled/empty WIP buffer, so the vast majority of >> > subpartitions will not need to request a new buffer. >> > >> > Best, >> > Piotrek >> > >> > [1] https://issues.apache.org/jira/browse/FLINK-13203 >> > >> > wt., 3 maj 2022 o 17:15 Anton Kalashnikov <kaa....@yandex.com> >> napisał(a): >> > >> >> Hi, >> >> >> >> >> >> >> Do you mean to ignore it while processing records, but keep using >> >> `maxBuffersPerChannel` when calculating the availability of the output? >> >> >> >> >> >> Yes, it is correct. >> >> >> >> >> >> >> Would it be a big issue if we changed it to check if at least >> >> "overdraft number of buffers are available", where "overdraft number" >> is >> >> configurable, instead of the currently hardcoded value of "1"? >> >> >> >> >> >> These things resolve the different problems (at least as I see that). >> >> The current hardcoded "1" says that we switch "availability" to >> >> "unavailability" when one more buffer is left(actually a little less >> >> than one buffer since we write the last piece of data to this last >> >> buffer). The overdraft feature doesn't change this logic we still want >> >> to switch to "unavailability" in such a way but if we are already in >> >> "unavailability" and we want more buffers then we can take "overdraft >> >> number" more. So we can not avoid this hardcoded "1" since we need to >> >> understand when we should switch to "unavailability" >> >> >> >> >> >> -- About "reserve" vs "overdraft" >> >> >> >> As Fanrui mentioned above, perhaps, the best overdraft size will be >> >> equal to parallelism. Also, the user can set any value he wants. So >> even >> >> if parallelism is small(~5) but the user's flatmap produces a lot of >> >> data, the user can set 10 or even more. Which almost double the max >> >> buffers and it will be impossible to reserve. At least we need to >> figure >> >> out how to protect from such cases (the limit for an overdraft?). So >> >> actually it looks even more difficult than increasing the maximum >> buffers. >> >> >> >> I want to emphasize that overdraft buffers are soft configuration which >> >> means it takes as many buffers as the global buffers pool has >> >> available(maybe zero) but less than this configured value. It is also >> >> important to notice that perhaps, not many subtasks in TaskManager will >> >> be using this feature so we don't actually need a lot of available >> >> buffers for every subtask(Here, I mean that if we have only one >> >> window/flatmap operator and many other operators, then one TaskManager >> >> will have many ordinary subtasks which don't actually need overdraft >> and >> >> several subtasks that needs this feature). But in case of reservation, >> >> we will reserve some buffers for all operators even if they don't >> really >> >> need it. >> >> >> >> >> >> -- Legacy source problem >> >> >> >> If we still want to change max buffers then it is problem for >> >> LegacySources(since every subtask of source will always use these >> >> overdraft). But right now, I think that we can force to set 0 overdraft >> >> buffers for legacy subtasks in configuration during execution(if it is >> >> not too late for changing configuration in this place). >> >> >> >> >> >> 03.05.2022 14:11, rui fan пишет: >> >>> Hi >> >>> >> >>> Thanks for Martijn Visser and Piotrek's feedback. I agree with >> >>> ignoring the legacy source, it will affect our design. User should >> >>> use the new Source Api as much as possible. >> >>> >> >>> Hi Piotrek, we may still need to discuss whether the >> >>> overdraft/reserve/spare should use extra buffers or buffers >> >>> in (exclusive + floating buffers)? They have some differences. >> >>> >> >>> If it uses extra buffers: >> >>> 1.The LocalBufferPool will be available when (usedBuffers + 1 >> >>> <= currentPoolSize) and all subpartitions don't reach the >> >>> maxBuffersPerChannel. >> >>> >> >>> If it uses the buffers in (exclusive + floating buffers): >> >>> 1. The LocalBufferPool will be available when (usedBuffers + >> >>> overdraftBuffers <= currentPoolSize) and all subpartitions >> >>> don't reach the maxBuffersPerChannel. >> >>> 2. For low parallelism jobs, if overdraftBuffers is large(>8), the >> >>> usedBuffers will be small. That is the LocalBufferPool will be >> >>> easily unavailable. For throughput, if users turn up the >> >>> overdraft buffers, they need to turn up exclusive or floating >> >>> buffers. It also affects the InputChannel, and it's is unfriendly >> >>> to users. >> >>> >> >>> So I prefer the overdraft to use extra buffers. >> >>> >> >>> >> >>> BTW, for watermark, the number of buffers it needs is >> >>> numberOfSubpartitions. So if overdraftBuffers=numberOfSubpartitions, >> >>> the watermark won't block in requestMemory. But it has >> >>> 2 problems: >> >>> 1. It needs more overdraft buffers. If the overdraft uses >> >>> (exclusive + floating buffers), there will be fewer buffers >> >>> available. Throughput may be affected. >> >>> 2. The numberOfSubpartitions is different for each Task. >> >>> So if users want to cover watermark using this feature, >> >>> they don't know how to set the overdraftBuffers more r >> >>> easonably. And if the parallelism is changed, users still >> >>> need to change overdraftBuffers. It is unfriendly to users. >> >>> >> >>> So I propose we support overdraftBuffers=-1, It means >> >>> we will automatically set overdraftBuffers=numberOfSubpartitions >> >>> in the Constructor of LocalBufferPool. >> >>> >> >>> Please correct me if I'm wrong. >> >>> >> >>> Thanks >> >>> fanrui >> >>> >> >>> On Tue, May 3, 2022 at 4:54 PM Piotr Nowojski <pnowoj...@apache.org> >> >> wrote: >> >>>> Hi fanrui, >> >>>> >> >>>>> Do you mean don't add the extra buffers? We just use (exclusive >> >> buffers * >> >>>>> parallelism + floating buffers)? The LocalBufferPool will be >> available >> >>>> when >> >>>>> (usedBuffers+overdraftBuffers <= >> >>>> exclusiveBuffers*parallelism+floatingBuffers) >> >>>>> and all subpartitions don't reach the maxBuffersPerChannel, right? >> >>>> I'm not sure. Definitely we would need to adjust the minimum number >> of >> >> the >> >>>> required buffers, just as we did when we were implementing the non >> >> blocking >> >>>> outputs and adding availability logic to LocalBufferPool. Back then >> we >> >>>> added "+ 1" to the minimum number of buffers. Currently this logic is >> >>>> located NettyShuffleUtils#getMinMaxNetworkBuffersPerResultPartition: >> >>>> >> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions >> + 1; >> >>>> For performance reasons, we always require at least one buffer per >> >>>> sub-partition. Otherwise performance falls drastically. Now if we >> >> require 5 >> >>>> overdraft buffers for output to be available, we need to have them on >> >> top >> >>>> of those "one buffer per sub-partition". So the logic should be >> changed >> >> to: >> >>>>> int min = isSortShuffle ? sortShuffleMinBuffers : numSubpartitions + >> >>>> numOverdraftBuffers; >> >>>> >> >>>> Regarding increasing the number of max buffers I'm not sure. As long >> as >> >>>> "overdraft << max number of buffers", because all buffers on the >> outputs >> >>>> are shared across all sub-partitions. If we have 5 overdraft buffers, >> >> and >> >>>> parallelism of 100, it doesn't matter in the grand scheme of things >> if >> >> we >> >>>> make the output available if at least one single buffer is available >> or >> >> at >> >>>> least 5 buffers are available out of ~200 (100 * 2 + 8). So effects >> of >> >>>> increasing the overdraft from 1 to for example 5 should be >> negligible. >> >> For >> >>>> small parallelism, like 5, increasing overdraft from 1 to 5 still >> >> increases >> >>>> the overdraft by only about 25%. So maybe we can keep the max as it >> is? >> >>>> >> >>>> If so, maybe we should change the name from "overdraft" to "buffer >> >> reserve" >> >>>> or "spare buffers"? And document it as "number of buffers kept in >> >> reserve >> >>>> in case of flatMap/firing timers/huge records"? >> >>>> >> >>>> What do you think Fenrui, Anton? >> >>>> >> >>>> Re LegacySources. I agree we can kind of ignore them in the new >> >> features, >> >>>> as long as we don't brake the existing deployments too much. >> >>>> >> >>>> Best, >> >>>> Piotrek >> >>>> >> >>>> wt., 3 maj 2022 o 09:20 Martijn Visser <mart...@ververica.com> >> >> napisał(a): >> >>>>> Hi everyone, >> >>>>> >> >>>>> Just wanted to chip in on the discussion of legacy sources: IMHO, we >> >>>> should >> >>>>> not focus too much on improving/adding capabilities for legacy >> sources. >> >>>> We >> >>>>> want to persuade and push users to use the new Source API. Yes, this >> >>>> means >> >>>>> that there's work required by the end users to port any custom >> source >> >> to >> >>>>> the new interface. The benefits of the new Source API should >> outweigh >> >>>> this. >> >>>>> Anything that we build to support multiple interfaces means adding >> more >> >>>>> complexity and more possibilities for bugs. Let's try to make our >> >> lives a >> >>>>> little bit easier. >> >>>>> >> >>>>> Best regards, >> >>>>> >> >>>>> Martijn Visser >> >>>>> https://twitter.com/MartijnVisser82 >> >>>>> https://github.com/MartijnVisser >> >>>>> >> >>>>> >> >>>>> On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> wrote: >> >>>>> >> >>>>>> Hi Piotrek >> >>>>>> >> >>>>>>> Do you mean to ignore it while processing records, but keep using >> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the >> >>>> output? >> >>>>>> I think yes, and please Anton Kalashnikov to help double check. >> >>>>>> >> >>>>>>> +1 for just having this as a separate configuration. Is it a big >> >>>>> problem >> >>>>>>> that legacy sources would be ignoring it? Note that we already >> have >> >>>>>>> effectively hardcoded a single overdraft buffer. >> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single >> >>>> buffer >> >>>>>>> available and this works the same for all tasks (including legacy >> >>>>> source >> >>>>>>> tasks). Would it be a big issue if we changed it to check if at >> least >> >>>>>>> "overdraft number of buffers are available", where "overdraft >> number" >> >>>>> is >> >>>>>>> configurable, instead of the currently hardcoded value of "1"? >> >>>>>> Do you mean don't add the extra buffers? We just use (exclusive >> >>>> buffers * >> >>>>>> parallelism + floating buffers)? The LocalBufferPool will be >> available >> >>>>> when >> >>>>>> (usedBuffers+overdraftBuffers <= >> >>>>>> exclusiveBuffers*parallelism+floatingBuffers) >> >>>>>> and all subpartitions don't reach the maxBuffersPerChannel, right? >> >>>>>> >> >>>>>> If yes, I think it can solve the problem of legacy source. There >> may >> >> be >> >>>>>> some impact. If overdraftBuffers is large and only one buffer is >> used >> >>>> to >> >>>>>> process a single record, exclusive buffers*parallelism + floating >> >>>> buffers >> >>>>>> cannot be used. It may only be possible to use (exclusive buffers * >> >>>>>> parallelism >> >>>>>> + floating buffers - overdraft buffers + 1). For throughput, if >> turn >> >> up >> >>>>> the >> >>>>>> overdraft buffers, the flink user needs to turn up exclusive or >> >>>> floating >> >>>>>> buffers. And it also affects the InputChannel. >> >>>>>> >> >>>>>> If not, I don't think it can solve the problem of legacy source. >> The >> >>>>> legacy >> >>>>>> source don't check isAvailable, If there are the extra buffers, >> legacy >> >>>>>> source >> >>>>>> will use them up until block in requestMemory. >> >>>>>> >> >>>>>> >> >>>>>> Thanks >> >>>>>> fanrui >> >>>>>> >> >>>>>> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski < >> pnowoj...@apache.org> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> Hi, >> >>>>>>> >> >>>>>>> +1 for the general proposal from my side. It would be a nice >> >>>> workaround >> >>>>>>> flatMaps, WindowOperators and large records issues with unaligned >> >>>>>>> checkpoints. >> >>>>>>> >> >>>>>>>> The first task is about ignoring max buffers per channel. This >> >>>> means >> >>>>> if >> >>>>>>>> we request a memory segment from LocalBufferPool and the >> >>>>>>>> maxBuffersPerChannel is reached for this channel, we just ignore >> >>>> that >> >>>>>>>> and continue to allocate buffer while LocalBufferPool has it(it >> is >> >>>>>>>> actually not a overdraft). >> >>>>>>> Do you mean to ignore it while processing records, but keep using >> >>>>>>> `maxBuffersPerChannel` when calculating the availability of the >> >>>> output? >> >>>>>>>> The second task is about the real overdraft. I am pretty >> convinced >> >>>>> now >> >>>>>>>> that we, unfortunately, need configuration for limitation of >> >>>>> overdraft >> >>>>>>>> number(because it is not ok if one subtask allocates all buffers >> of >> >>>>> one >> >>>>>>>> TaskManager considering that several different jobs can be >> >>>> submitted >> >>>>> on >> >>>>>>>> this TaskManager). So idea is to have >> >>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >> >>>>>> LocalBufferPool). >> >>>>>>>> In this case, when a limit of buffers in LocalBufferPool is >> >>>> reached, >> >>>>>>>> LocalBufferPool can request additionally from NetworkBufferPool >> up >> >>>> to >> >>>>>>>> maxOverdraftBuffersPerPartition buffers. >> >>>>>>> +1 for just having this as a separate configuration. Is it a big >> >>>>> problem >> >>>>>>> that legacy sources would be ignoring it? Note that we already >> have >> >>>>>>> effectively hardcoded a single overdraft buffer. >> >>>>>>> `LocalBufferPool#checkAvailability` checks if there is a single >> >>>> buffer >> >>>>>>> available and this works the same for all tasks (including legacy >> >>>>> source >> >>>>>>> tasks). Would it be a big issue if we changed it to check if at >> least >> >>>>>>> "overdraft number of buffers are available", where "overdraft >> number" >> >>>>> is >> >>>>>>> configurable, instead of the currently hardcoded value of "1"? >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> Piotrek >> >>>>>>> >> >>>>>>> pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> >> napisał(a): >> >>>>>>> >> >>>>>>>> Let me add some information about the LegacySource. >> >>>>>>>> >> >>>>>>>> If we want to disable the overdraft buffer for LegacySource. >> >>>>>>>> Could we add the enableOverdraft in LocalBufferPool? >> >>>>>>>> The default value is false. If the getAvailableFuture is called, >> >>>>>>>> change enableOverdraft=true. It indicates whether there are >> >>>>>>>> checks isAvailable elsewhere. >> >>>>>>>> >> >>>>>>>> I don't think it is elegant, but it's safe. Please correct me if >> >>>> I'm >> >>>>>>> wrong. >> >>>>>>>> Thanks >> >>>>>>>> fanrui >> >>>>>>>> >> >>>>>>>> On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> >> >>>>> wrote: >> >>>>>>>>> Hi, >> >>>>>>>>> >> >>>>>>>>> Thanks for your quick response. >> >>>>>>>>> >> >>>>>>>>> For question 1/2/3, we think they are clear. We just need to >> >>>>> discuss >> >>>>>>> the >> >>>>>>>>> default value in PR. >> >>>>>>>>> >> >>>>>>>>> For the legacy source, you are right. It's difficult for general >> >>>>>>>>> implementation. >> >>>>>>>>> Currently, we implement ensureRecordWriterIsAvailable() in >> >>>>>>>>> SourceFunction.SourceContext. And call it in our common >> >>>>> LegacySource, >> >>>>>>>>> e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume >> >>>> kafka, >> >>>>> so >> >>>>>>>>> fixing FlinkKafkaConsumer solved most of our problems. >> >>>>>>>>> >> >>>>>>>>> Core code: >> >>>>>>>>> ``` >> >>>>>>>>> public void ensureRecordWriterIsAvailable() { >> >>>>>>>>> if (recordWriter == null >> >>>>>>>>> || >> >>>>>>>>> >> >> >> !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, >> >>>>>>>>> false) >> >>>>>>>>> || recordWriter.isAvailable()) { >> >>>>>>>>> return; >> >>>>>>>>> } >> >>>>>>>>> >> >>>>>>>>> CompletableFuture<?> resumeFuture = >> >>>>>>>> recordWriter.getAvailableFuture(); >> >>>>>>>>> try { >> >>>>>>>>> resumeFuture.get(); >> >>>>>>>>> } catch (Throwable ignored) { >> >>>>>>>>> } >> >>>>>>>>> } >> >>>>>>>>> ``` >> >>>>>>>>> >> >>>>>>>>> LegacySource calls sourceContext.ensureRecordWriterIsAvailable() >> >>>>>>>>> before synchronized (checkpointLock) and collects records. >> >>>>>>>>> Please let me know if there is a better solution. >> >>>>>>>>> >> >>>>>>>>> Thanks >> >>>>>>>>> fanrui >> >>>>>>>>> >> >>>>>>>>> On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov < >> >>>>>> kaa....@yandex.com> >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Hi. >> >>>>>>>>>> >> >>>>>>>>>> -- 1. Do you mean split this into two JIRAs or two PRs or two >> >>>>>> commits >> >>>>>>>> in a >> >>>>>>>>>> PR? >> >>>>>>>>>> >> >>>>>>>>>> Perhaps, the separated ticket will be better since this task >> has >> >>>>>> fewer >> >>>>>>>>>> questions but we should find a solution for LegacySource first. >> >>>>>>>>>> >> >>>>>>>>>> -- 2. For the first task, if the flink user disables the >> >>>>> Unaligned >> >>>>>>>>>> Checkpoint, do we ignore max buffers per channel? Because >> >>>> the >> >>>>>>>>>> overdraft >> >>>>>>>>>> isn't useful for the Aligned Checkpoint, it still needs >> to >> >>>>> wait >> >>>>>>> for >> >>>>>>>>>> downstream Task to consume. >> >>>>>>>>>> >> >>>>>>>>>> I think that the logic should be the same for AC and UC. As I >> >>>>>>>> understand, >> >>>>>>>>>> the overdraft maybe is not really helpful for AC but it doesn't >> >>>>> make >> >>>>>>> it >> >>>>>>>>>> worse as well. >> >>>>>>>>>> >> >>>>>>>>>> 3. For the second task >> >>>>>>>>>> -- - The default value of maxOverdraftBuffersPerPartition >> >>>> may >> >>>>>>> also >> >>>>>>>>>> need >> >>>>>>>>>> to be discussed. >> >>>>>>>>>> >> >>>>>>>>>> I think it should be a pretty small value or even 0 since it >> >>>> kind >> >>>>> of >> >>>>>>>>>> optimization and user should understand what they do(especially >> >>>> if >> >>>>>> we >> >>>>>>>>>> implement the first task). >> >>>>>>>>>> >> >>>>>>>>>> -- - If the user disables the Unaligned Checkpoint, can we >> >>>>> set >> >>>>>>> the >> >>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >> overdraft >> >>>>>> isn't >> >>>>>>>>>> useful for >> >>>>>>>>>> the Aligned Checkpoint. >> >>>>>>>>>> >> >>>>>>>>>> The same answer that above, if the overdraft doesn't make >> >>>>>> degradation >> >>>>>>>> for >> >>>>>>>>>> the Aligned Checkpoint I don't think that we should make >> >>>>> difference >> >>>>>>>> between >> >>>>>>>>>> AC and UC. >> >>>>>>>>>> >> >>>>>>>>>> 4. For the legacy source >> >>>>>>>>>> -- - If enabling the Unaligned Checkpoint, it uses up to >> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >> >>>>>>>>>> - If disabling the UC, it doesn't use the overdraft >> >>>> buffer. >> >>>>>>>>>> - Do you think it's ok? >> >>>>>>>>>> >> >>>>>>>>>> Ideally, I don't want to use overdraft for LegacySource at all >> >>>>> since >> >>>>>>> it >> >>>>>>>>>> can lead to undesirable results especially if the limit is >> high. >> >>>>> At >> >>>>>>>> least, >> >>>>>>>>>> as I understand, it will always work in overdraft mode and it >> >>>> will >> >>>>>>>> borrow >> >>>>>>>>>> maxOverdraftBuffersPerPartition buffers from the global pool >> >>>> which >> >>>>>> can >> >>>>>>>> lead >> >>>>>>>>>> to degradation of other subtasks on the same TaskManager. >> >>>>>>>>>> >> >>>>>>>>>> -- - Actually, we added the checkAvailable logic for >> >>>>>> LegacySource >> >>>>>>>> in >> >>>>>>>>>> our >> >>>>>>>>>> internal version. It works well. >> >>>>>>>>>> >> >>>>>>>>>> I don't really understand how it is possible for general case >> >>>>>>>> considering >> >>>>>>>>>> that each user has their own implementation of >> >>>>> LegacySourceOperator >> >>>>>>>>>> -- 5. For the benchmark, do you have any suggestions? I >> >>>>> submitted >> >>>>>>> the >> >>>>>>>> PR >> >>>>>>>>>> [1]. >> >>>>>>>>>> >> >>>>>>>>>> I haven't looked at it yet, but I'll try to do it soon. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> 29.04.2022 14:14, rui fan пишет: >> >>>>>>>>>>> Hi, >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks for your feedback. I have a servel of questions. >> >>>>>>>>>>> >> >>>>>>>>>>> 1. Do you mean split this into two JIRAs or two PRs or >> two >> >>>>>>> commits >> >>>>>>>>>> in a >> >>>>>>>>>>> PR? >> >>>>>>>>>>> 2. For the first task, if the flink user disables the >> >>>>>> Unaligned >> >>>>>>>>>>> Checkpoint, do we ignore max buffers per channel? >> Because >> >>>>> the >> >>>>>>>>>> overdraft >> >>>>>>>>>>> isn't useful for the Aligned Checkpoint, it still needs >> to >> >>>>>> wait >> >>>>>>>> for >> >>>>>>>>>>> downstream Task to consume. >> >>>>>>>>>>> 3. For the second task >> >>>>>>>>>>> - The default value of >> maxOverdraftBuffersPerPartition >> >>>>> may >> >>>>>>> also >> >>>>>>>>>> need >> >>>>>>>>>>> to be discussed. >> >>>>>>>>>>> - If the user disables the Unaligned Checkpoint, can >> we >> >>>>> set >> >>>>>>> the >> >>>>>>>>>>> maxOverdraftBuffersPerPartition=0? Because the >> >>>> overdraft >> >>>>>>> isn't >> >>>>>>>>>> useful for >> >>>>>>>>>>> the Aligned Checkpoint. >> >>>>>>>>>>> 4. For the legacy source >> >>>>>>>>>>> - If enabling the Unaligned Checkpoint, it uses up to >> >>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >> >>>>>>>>>>> - If disabling the UC, it doesn't use the overdraft >> >>>>> buffer. >> >>>>>>>>>>> - Do you think it's ok? >> >>>>>>>>>>> - Actually, we added the checkAvailable logic for >> >>>>>>> LegacySource >> >>>>>>>>>> in our >> >>>>>>>>>>> internal version. It works well. >> >>>>>>>>>>> 5. For the benchmark, do you have any suggestions? I >> >>>>> submitted >> >>>>>>> the >> >>>>>>>>>> PR >> >>>>>>>>>>> [1]. >> >>>>>>>>>>> >> >>>>>>>>>>> [1] https://github.com/apache/flink-benchmarks/pull/54 >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks >> >>>>>>>>>>> fanrui >> >>>>>>>>>>> >> >>>>>>>>>>> On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov < >> >>>>>>> kaa....@yandex.com >> >>>>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hi, >> >>>>>>>>>>>> >> >>>>>>>>>>>> We discuss about it a little with Dawid Wysakowicz. Here is >> >>>>> some >> >>>>>>>>>>>> conclusion: >> >>>>>>>>>>>> >> >>>>>>>>>>>> First of all, let's split this into two tasks. >> >>>>>>>>>>>> >> >>>>>>>>>>>> The first task is about ignoring max buffers per channel. >> >>>> This >> >>>>>>> means >> >>>>>>>> if >> >>>>>>>>>>>> we request a memory segment from LocalBufferPool and the >> >>>>>>>>>>>> maxBuffersPerChannel is reached for this channel, we just >> >>>>> ignore >> >>>>>>> that >> >>>>>>>>>>>> and continue to allocate buffer while LocalBufferPool has >> >>>> it(it >> >>>>>> is >> >>>>>>>>>>>> actually not a overdraft). >> >>>>>>>>>>>> >> >>>>>>>>>>>> The second task is about the real overdraft. I am pretty >> >>>>>> convinced >> >>>>>>>> now >> >>>>>>>>>>>> that we, unfortunately, need configuration for limitation of >> >>>>>>>> overdraft >> >>>>>>>>>>>> number(because it is not ok if one subtask allocates all >> >>>>> buffers >> >>>>>> of >> >>>>>>>> one >> >>>>>>>>>>>> TaskManager considering that several different jobs can be >> >>>>>>> submitted >> >>>>>>>> on >> >>>>>>>>>>>> this TaskManager). So idea is to have >> >>>>>>>>>>>> maxOverdraftBuffersPerPartition(technically to say per >> >>>>>>>>>> LocalBufferPool). >> >>>>>>>>>>>> In this case, when a limit of buffers in LocalBufferPool is >> >>>>>>> reached, >> >>>>>>>>>>>> LocalBufferPool can request additionally from >> >>>> NetworkBufferPool >> >>>>>> up >> >>>>>>> to >> >>>>>>>>>>>> maxOverdraftBuffersPerPartition buffers. >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> But it is still not clear how to handle LegacySource since it >> >>>>>>>> actually >> >>>>>>>>>>>> works as unlimited flatmap and it will always work in >> >>>> overdraft >> >>>>>>> mode >> >>>>>>>>>>>> which is not a target. So we still need to think about that. >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> 29.04.2022 11:11, rui fan пишет: >> >>>>>>>>>>>>> Hi Anton Kalashnikov, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I think you agree with we should limit the maximum number of >> >>>>>>>> overdraft >> >>>>>>>>>>>>> segments that each LocalBufferPool can apply for, right? >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I prefer to hard code the maxOverdraftBuffers due to don't >> >>>> add >> >>>>>> the >> >>>>>>>> new >> >>>>>>>>>>>>> configuration. And I hope to hear more from the community. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Best wishes >> >>>>>>>>>>>>> fanrui >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:39 PM rui fan < >> >>>>> 1996fan...@gmail.com> >> >>>>>>>>>> wrote: >> >>>>>>>>>>>>>> Hi Anton Kalashnikov, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Thanks for your very clear reply, I think you are totally >> >>>>>> right. >> >>>>>>>>>>>>>> The 'maxBuffersNumber - buffersInUseNumber' can be used as >> >>>>> the >> >>>>>>>>>>>>>> overdraft buffer, it won't need the new buffer >> >>>>>>> configuration.Flink >> >>>>>>>>>> users >> >>>>>>>>>>>>>> can turn up the maxBuffersNumber to control the overdraft >> >>>>>> buffer >> >>>>>>>>>> size. >> >>>>>>>>>>>>>> Also, I‘d like to add some information. For safety, we >> >>>> should >> >>>>>>> limit >> >>>>>>>>>> the >> >>>>>>>>>>>>>> maximum number of overdraft segments that each >> >>>>> LocalBufferPool >> >>>>>>>>>>>>>> can apply for. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Why do we limit it? >> >>>>>>>>>>>>>> Some operators don't check the `recordWriter.isAvailable` >> >>>>>> during >> >>>>>>>>>>>>>> processing records, such as LegacySource. I have mentioned >> >>>> it >> >>>>>> in >> >>>>>>>>>>>>>> FLINK-26759 [1]. I'm not sure if there are other cases. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> If don't add the limitation, the LegacySource will use up >> >>>> all >> >>>>>>>>>> remaining >> >>>>>>>>>>>>>> memory in the NetworkBufferPool when the backpressure is >> >>>>>> severe. >> >>>>>>>>>>>>>> How to limit it? >> >>>>>>>>>>>>>> I prefer to hard code the >> >>>>>>>> `maxOverdraftBuffers=numberOfSubpartitions` >> >>>>>>>>>>>>>> in the constructor of LocalBufferPool. The >> >>>>> maxOverdraftBuffers >> >>>>>> is >> >>>>>>>>>> just >> >>>>>>>>>>>>>> for safety, and it should be enough for most flink jobs. Or >> >>>>> we >> >>>>>>> can >> >>>>>>>>>> set >> >>>>>>>>>>>>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` >> >>>> to >> >>>>>>> handle >> >>>>>>>>>>>>>> some jobs of low parallelism. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Also if user don't enable the Unaligned Checkpoint, we can >> >>>>> set >> >>>>>>>>>>>>>> maxOverdraftBuffers=0 in the constructor of >> >>>> LocalBufferPool. >> >>>>>>>> Because >> >>>>>>>>>>>>>> the overdraft isn't useful for the Aligned Checkpoint. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Please correct me if I'm wrong. Thanks a lot. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-26759 >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Best wishes >> >>>>>>>>>>>>>> fanrui >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov < >> >>>>>>>>>> kaa....@yandex.com> >> >>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Hi fanrui, >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Thanks for creating the FLIP. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> In general, I think the overdraft is good idea and it >> >>>> should >> >>>>>>> help >> >>>>>>>> in >> >>>>>>>>>>>>>>> described above cases. Here are my thoughts about >> >>>>>> configuration: >> >>>>>>>>>>>>>>> Please, correct me if I am wrong but as I understand right >> >>>>> now >> >>>>>>> we >> >>>>>>>>>> have >> >>>>>>>>>>>>>>> following calculation. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> maxBuffersNumber(per TaskManager) = Network >> >>>>> memory(calculated >> >>>>>>> via >> >>>>>>>>>>>>>>> taskmanager.memory.network.fraction, >> >>>>>>>> taskmanager.memory.network.min, >> >>>>>>>>>>>>>>> taskmanager.memory.network.max and total memory size) / >> >>>>>>>>>>>>>>> taskmanager.memory.segment-size. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> requiredBuffersNumber(per TaskManager) = (exclusive >> >>>> buffers >> >>>>> * >> >>>>>>>>>>>>>>> parallelism + floating buffers) * subtasks number in >> >>>>>> TaskManager >> >>>>>>>>>>>>>>> buffersInUseNumber = real number of buffers which used at >> >>>>>>> current >> >>>>>>>>>>>>>>> moment(always <= requiredBuffersNumber) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Ideally requiredBuffersNumber should be equal to >> >>>>>>> maxBuffersNumber >> >>>>>>>>>> which >> >>>>>>>>>>>>>>> allows Flink work predictibly. But if >> >>>> requiredBuffersNumber >> >>>>>>>>>>>>>>> maxBuffersNumber sometimes it is also fine(but not good) >> >>>>> since >> >>>>>>> not >> >>>>>>>>>> all >> >>>>>>>>>>>>>>> required buffers really mandatory(e.g. it is ok if Flink >> >>>> can >> >>>>>> not >> >>>>>>>>>>>>>>> allocate floating buffers) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> But if maxBuffersNumber > requiredBuffersNumber, as I >> >>>>>> understand >> >>>>>>>>>> Flink >> >>>>>>>>>>>>>>> just never use these leftovers buffers(maxBuffersNumber - >> >>>>>>>>>>>>>>> requiredBuffersNumber). Which I propose to use. ( we can >> >>>>>> actualy >> >>>>>>>> use >> >>>>>>>>>>>>>>> even difference 'requiredBuffersNumber - >> >>>> buffersInUseNumber' >> >>>>>>> since >> >>>>>>>>>> if >> >>>>>>>>>>>>>>> one TaskManager contains several operators including >> >>>>> 'window' >> >>>>>>>> which >> >>>>>>>>>> can >> >>>>>>>>>>>>>>> temporally borrow buffers from the global pool). >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> My proposal, more specificaly(it relates only to >> >>>> requesting >> >>>>>>>> buffers >> >>>>>>>>>>>>>>> during processing single record while switching to >> >>>>>> unavalability >> >>>>>>>>>>>> between >> >>>>>>>>>>>>>>> records should be the same as we have it now): >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> * If one more buffer requested but maxBuffersPerChannel >> >>>>>> reached, >> >>>>>>>>>> then >> >>>>>>>>>>>>>>> just ignore this limitation and allocate this buffers from >> >>>>> any >> >>>>>>>>>>>>>>> place(from LocalBufferPool if it has something yet >> >>>> otherwise >> >>>>>>> from >> >>>>>>>>>>>>>>> NetworkBufferPool) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> * If LocalBufferPool exceeds limit, then temporally >> >>>> allocate >> >>>>>> it >> >>>>>>>> from >> >>>>>>>>>>>>>>> NetworkBufferPool while it has something to allocate >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Maybe I missed something and this solution won't work, >> >>>> but I >> >>>>>>> like >> >>>>>>>> it >> >>>>>>>>>>>>>>> since on the one hand, it work from the scratch without >> >>>> any >> >>>>>>>>>>>>>>> configuration, on the other hand, it can be configuration >> >>>> by >> >>>>>>>>>> changing >> >>>>>>>>>>>>>>> proportion of maxBuffersNumber and requiredBuffersNumber. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> The last thing that I want to say, I don't really want to >> >>>>>>>> implement >> >>>>>>>>>> new >> >>>>>>>>>>>>>>> configuration since even now it is not clear how to >> >>>>> correctly >> >>>>>>>>>> configure >> >>>>>>>>>>>>>>> network buffers with existing configuration and I don't >> >>>> want >> >>>>>> to >> >>>>>>>>>>>>>>> complicate it, especially if it will be possible to >> >>>> resolve >> >>>>>> the >> >>>>>>>>>> problem >> >>>>>>>>>>>>>>> automatically(as described above). >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> So is my understanding about network memory/buffers >> >>>> correct? >> >>>>>>>>>>>>>>> -- >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Best regards, >> >>>>>>>>>>>>>>> Anton Kalashnikov >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> 27.04.2022 07:46, rui fan пишет: >> >>>>>>>>>>>>>>>> Hi everyone, >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature of >> >>>>>> Flink. >> >>>>>>>> It >> >>>>>>>>>>>>>>>> effectively solves the problem of checkpoint timeout or >> >>>>> slow >> >>>>>>>>>>>>>>>> checkpoint when backpressure is severe. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> We found that UC(Unaligned Checkpoint) does not work well >> >>>>>> when >> >>>>>>>> the >> >>>>>>>>>>>>>>>> back pressure is severe and multiple output buffers are >> >>>>>>> required >> >>>>>>>> to >> >>>>>>>>>>>>>>>> process a single record. FLINK-14396 [2] also mentioned >> >>>>> this >> >>>>>>>> issue >> >>>>>>>>>>>>>>>> before. So we propose the overdraft buffer to solve it. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the >> >>>>>>> overdraft >> >>>>>>>>>>>>>>>> buffer mechanism. After discussing with Anton >> >>>> Kalashnikov, >> >>>>>>> there >> >>>>>>>>>> are >> >>>>>>>>>>>>>>>> still some points to discuss: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> * There are already a lot of buffer-related >> >>>>>> configurations. >> >>>>>>>> Do >> >>>>>>>>>> we >> >>>>>>>>>>>>>>>> need to add a new configuration for the overdraft >> >>>>>> buffer? >> >>>>>>>>>>>>>>>> * Where should the overdraft buffer use memory? >> >>>>>>>>>>>>>>>> * If the overdraft-buffer uses the memory remaining >> >>>> in >> >>>>>> the >> >>>>>>>>>>>>>>>> NetworkBufferPool, no new configuration needs to >> be >> >>>>>>> added. >> >>>>>>>>>>>>>>>> * If adding a new configuration: >> >>>>>>>>>>>>>>>> o Should we set the overdraft-memory-size at >> the >> >>>> TM >> >>>>>>> level >> >>>>>>>>>> or >> >>>>>>>>>>>> the >> >>>>>>>>>>>>>>>> Task level? >> >>>>>>>>>>>>>>>> o Or set overdraft-buffers to indicate the >> number >> >>>>> of >> >>>>>>>>>>>>>>>> memory-segments that can be overdrawn. >> >>>>>>>>>>>>>>>> o What is the default value? How to set >> sensible >> >>>>>>>> defaults? >> >>>>>>>>>>>>>>>> Currently, I implemented a POC [5] and verified it using >> >>>>>>>>>>>>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers at >> >>>>> Task >> >>>>>>>> level, >> >>>>>>>>>>>>>>>> and default value is 10. That is: each LocalBufferPool >> >>>> can >> >>>>>>>>>> overdraw up >> >>>>>>>>>>>>>>>> to 10 memory-segments. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Looking forward to your feedback! >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Thanks, >> >>>>>>>>>>>>>>>> fanrui >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> [1] >> >>>>>>>>>>>>>>>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints >> >>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396 >> >>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762 >> >>>>>>>>>>>>>>>> [4] >> >>>>>>>>>>>>>>>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer >> >>>>>>>>>>>>>>>> [5] >> >>>>>>>>>>>>>>>> >> >> >> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe >> >>>>>>>>>>>>>>>> [6] https://github.com/apache/flink-benchmarks/pull/54 >> >>>>>>>>>>>> -- >> >>>>>>>>>>>> >> >>>>>>>>>>>> Best regards, >> >>>>>>>>>>>> Anton Kalashnikov >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>> -- >> >>>>>>>>>> >> >>>>>>>>>> Best regards, >> >>>>>>>>>> Anton Kalashnikov >> >>>>>>>>>> >> >>>>>>>>>> >> >> -- >> >> >> >> Best regards, >> >> Anton Kalashnikov >> >> >> >> >> >