I really like the new idea. On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski <pnowoj...@apache.org> wrote:
> Hi Till, > > > I assume that buffer sizes are only > > changed for newly assigned buffers/credits, right? Otherwise, the data > > could already be on the wire and then it wouldn't fit on the receiver > side. > > Or do we have a back channel mechanism to tell the sender that a part of > a > > buffer needs to be resent once more capacity is available? > > Initially our implementation proposal was intending to implement the first > option. Buffer size would be attached to a credit message, so first > received would need to allocate a buffer with the updated size, send the > credit upstream, and sender would be allowed to only send as much data as > in the credit. So there would be no way and no problem with changing buffer > sizes while something is "on the wire". > > However Anton suggested an even simpler idea to me today. There is actually > no problem with receivers supporting all buffer sizes up to the maximum > allowed size (current configured memory segment size). Thus new buffer size > can be treated as a recommendation by the sender. We can announce a new > buffer size, and the sender will start capping the newly requested buffer > to that size, but we can still send already filled buffers in chunks with > any size, as long as it's below max memory segment size. In this way we can > leave any already filled in buffers on the sender side untouched and we do > not need to partition/slice them before sending them down, making at least > the initial version even simpler. This way we also do not need to > differentiate that different credits have different sizes. We just announce > a single value "recommended/requested buffer size". > > Piotrek > > czw., 15 lip 2021 o 17:27 Till Rohrmann <trohrm...@apache.org> napisał(a): > > > Hi everyone, > > > > Thanks a lot for creating this FLIP Anton and Piotr. I think it looks > like > > a very promising solution for speeding up our checkpoints and being able > to > > create them more reliably. > > > > Following up on Steven's question: I assume that buffer sizes are only > > changed for newly assigned buffers/credits, right? Otherwise, the data > > could already be on the wire and then it wouldn't fit on the receiver > side. > > Or do we have a back channel mechanism to tell the sender that a part of > a > > buffer needs to be resent once more capacity is available? > > > > Cheers, > > Till > > > > On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > > > Hi Steven, > > > > > > As downstream/upstream nodes are decoupled, if downstream nodes adjust > > > first it's buffer size first, there will be a lag until this updated > > buffer > > > size information reaches the upstream node.. It is a problem, but it > has > > a > > > quite simple solution that we described in the FLIP document: > > > > > > > Sending the buffer of the right size. > > > > It is not enough to know just the number of available buffers > (credits) > > > for the downstream because the size of these buffers can be different. > > > > So we are proposing to resolve this problem in the following way: If > > the > > > downstream buffer size is changed then the upstream should send > > > > the buffer of the size not greater than the new one regardless of how > > big > > > the current buffer on the upstream. (pollBuffer should receive > > > > parameters like bufferSize and return buffer not greater than it) > > > > > > So apart from adding buffer size information to the `AddCredit` > message, > > we > > > will need to support a case where upstream subpartition has already > > > produced a buffer with older size (for example 32KB), while the next > > credit > > > arrives with an allowance for a smaller size (16KB). In that case, we > are > > > only allowed to send a portion of the data from this buffer that fits > > into > > > the new updated buffer size, and keep announcing the remaining part as > > > available backlog. > > > > > > Best, > > > Piotrek > > > > > > > > > śr., 14 lip 2021 o 08:33 Steven Wu <stevenz...@gmail.com> napisał(a): > > > > > > > - The subtask observes the changes in the throughput and changes > the > > > > buffer size during the whole life period of the task. > > > > - The subtask sends buffer size and number of available buffers to > > the > > > > upstream to the corresponding subpartition. > > > > - Upstream changes the buffer size corresponding to the received > > > > information. > > > > - Upstream sends the data and number of filled buffers to the > > > downstream > > > > > > > > > > > > Will the above steps of buffer size adjustment cause problems with > > > > credit-based flow control (mainly for downsizing), since downstream > > > > adjust down first? > > > > > > > > Here is the quote from the blog[1] > > > > "Credit-based flow control makes sure that whatever is “on the wire” > > will > > > > have capacity at the receiver to handle. " > > > > > > > > > > > > [1] > > > > > > > > > > > > > > https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control > > > > > > > > > > > > On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao <kevin.ying...@gmail.com > > > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > Thanks for driving this, I think it is really helpful for jobs > > > suffering > > > > > from backpressure. > > > > > > > > > > Best, > > > > > Yingjie > > > > > > > > > > Anton,Kalashnikov <kaa....@yandex.com> 于2021年7月9日周五 下午10:59写道: > > > > > > > > > > > Hey! > > > > > > > > > > > > There is a wish to decrease amount of in-flight data which can > > > improve > > > > > > aligned checkpoint time(fewer in-flight data to process before > > > > > > checkpoint can complete) and improve the behaviour and > performance > > of > > > > > > unaligned checkpoints (fewer in-flight data that needs to be > > > persisted > > > > > > in every unaligned checkpoint). The main idea is not to keep as > > much > > > > > > in-flight data as much memory we have but keeping the amount of > > data > > > > > > which can be predictably handling for configured amount of > time(ex. > > > we > > > > > > keep data which can be processed in 1 sec). It can be achieved by > > > > > > calculation of the effective throughput and following changes the > > > > buffer > > > > > > size based on the this throughput. More details about the > proposal > > > you > > > > > > can find here [1]. > > > > > > > > > > > > What are you thoughts about it? > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment > > > > > > > > > > > > > > > > > > -- > > > > > > Best regards, > > > > > > Anton Kalashnikov > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >