Hi Steven, As downstream/upstream nodes are decoupled, if downstream nodes adjust first it's buffer size first, there will be a lag until this updated buffer size information reaches the upstream node.. It is a problem, but it has a quite simple solution that we described in the FLIP document:
> Sending the buffer of the right size. > It is not enough to know just the number of available buffers (credits) for the downstream because the size of these buffers can be different. > So we are proposing to resolve this problem in the following way: If the downstream buffer size is changed then the upstream should send > the buffer of the size not greater than the new one regardless of how big the current buffer on the upstream. (pollBuffer should receive > parameters like bufferSize and return buffer not greater than it) So apart from adding buffer size information to the `AddCredit` message, we will need to support a case where upstream subpartition has already produced a buffer with older size (for example 32KB), while the next credit arrives with an allowance for a smaller size (16KB). In that case, we are only allowed to send a portion of the data from this buffer that fits into the new updated buffer size, and keep announcing the remaining part as available backlog. Best, Piotrek śr., 14 lip 2021 o 08:33 Steven Wu <stevenz...@gmail.com> napisał(a): > - The subtask observes the changes in the throughput and changes the > buffer size during the whole life period of the task. > - The subtask sends buffer size and number of available buffers to the > upstream to the corresponding subpartition. > - Upstream changes the buffer size corresponding to the received > information. > - Upstream sends the data and number of filled buffers to the downstream > > > Will the above steps of buffer size adjustment cause problems with > credit-based flow control (mainly for downsizing), since downstream > adjust down first? > > Here is the quote from the blog[1] > "Credit-based flow control makes sure that whatever is “on the wire” will > have capacity at the receiver to handle. " > > > [1] > > https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control > > > On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao <kevin.ying...@gmail.com> > wrote: > > > Hi, > > > > Thanks for driving this, I think it is really helpful for jobs suffering > > from backpressure. > > > > Best, > > Yingjie > > > > Anton,Kalashnikov <kaa....@yandex.com> 于2021年7月9日周五 下午10:59写道: > > > > > Hey! > > > > > > There is a wish to decrease amount of in-flight data which can improve > > > aligned checkpoint time(fewer in-flight data to process before > > > checkpoint can complete) and improve the behaviour and performance of > > > unaligned checkpoints (fewer in-flight data that needs to be persisted > > > in every unaligned checkpoint). The main idea is not to keep as much > > > in-flight data as much memory we have but keeping the amount of data > > > which can be predictably handling for configured amount of time(ex. we > > > keep data which can be processed in 1 sec). It can be achieved by > > > calculation of the effective throughput and following changes the > buffer > > > size based on the this throughput. More details about the proposal you > > > can find here [1]. > > > > > > What are you thoughts about it? > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment > > > > > > > > > -- > > > Best regards, > > > Anton Kalashnikov > > > > > > > > > > > >