Hi Till,

>  I assume that buffer sizes are only
> changed for newly assigned buffers/credits, right? Otherwise, the data
> could already be on the wire and then it wouldn't fit on the receiver
side.
> Or do we have a back channel mechanism to tell the sender that a part of a
> buffer needs to be resent once more capacity is available?

Initially our implementation proposal was intending to implement the first
option. Buffer size would be attached to a credit message, so first
received would need to allocate a buffer with the updated size, send the
credit upstream, and sender would be allowed to only send as much data as
in the credit. So there would be no way and no problem with changing buffer
sizes while something is "on the wire".

However Anton suggested an even simpler idea to me today. There is actually
no problem with receivers supporting all buffer sizes up to the maximum
allowed size (current configured memory segment size). Thus new buffer size
can be treated as a recommendation by the sender. We can announce a new
buffer size, and the sender will start capping the newly requested buffer
to that size, but we can still send already filled buffers in chunks with
any size, as long as it's below max memory segment size. In this way we can
leave any already filled in buffers on the sender side untouched and we do
not need to partition/slice them before sending them down, making at least
the initial version even simpler. This way we also do not need to
differentiate that different credits have different sizes. We just announce
a single value "recommended/requested buffer size".

Piotrek

czw., 15 lip 2021 o 17:27 Till Rohrmann <trohrm...@apache.org> napisał(a):

> Hi everyone,
>
> Thanks a lot for creating this FLIP Anton and Piotr. I think it looks like
> a very promising solution for speeding up our checkpoints and being able to
> create them more reliably.
>
> Following up on Steven's question: I assume that buffer sizes are only
> changed for newly assigned buffers/credits, right? Otherwise, the data
> could already be on the wire and then it wouldn't fit on the receiver side.
> Or do we have a back channel mechanism to tell the sender that a part of a
> buffer needs to be resent once more capacity is available?
>
> Cheers,
> Till
>
> On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi Steven,
> >
> > As downstream/upstream nodes are decoupled, if downstream nodes adjust
> > first it's buffer size first, there will be a lag until this updated
> buffer
> > size information reaches the upstream node.. It is a problem, but it has
> a
> > quite simple solution that we described in the FLIP document:
> >
> > > Sending the buffer of the right size.
> > > It is not enough to know just the number of available buffers (credits)
> > for the downstream because the size of these buffers can be different.
> > > So we are proposing to resolve this problem in the following way: If
> the
> > downstream buffer size is changed then the upstream should send
> > > the buffer of the size not greater than the new one regardless of how
> big
> > the current buffer on the upstream. (pollBuffer should receive
> > > parameters like bufferSize and return buffer not greater than it)
> >
> > So apart from adding buffer size information to the `AddCredit` message,
> we
> > will need to support a case where upstream subpartition has already
> > produced a buffer with older size (for example 32KB), while the next
> credit
> > arrives with an allowance for a smaller size (16KB). In that case, we are
> > only allowed to send a portion of the data from this buffer that fits
> into
> > the new updated buffer size, and keep announcing the remaining part as
> > available backlog.
> >
> > Best,
> > Piotrek
> >
> >
> > śr., 14 lip 2021 o 08:33 Steven Wu <stevenz...@gmail.com> napisał(a):
> >
> > >    - The subtask observes the changes in the throughput and changes the
> > >    buffer size during the whole life period of the task.
> > >    - The subtask sends buffer size and number of available buffers to
> the
> > >    upstream to the corresponding subpartition.
> > >    - Upstream changes the buffer size corresponding to the received
> > >    information.
> > >    - Upstream sends the data and number of filled buffers to the
> > downstream
> > >
> > >
> > > Will the above steps of buffer size adjustment cause problems with
> > > credit-based flow control (mainly for downsizing), since downstream
> > > adjust down first?
> > >
> > > Here is the quote from the blog[1]
> > > "Credit-based flow control makes sure that whatever is “on the wire”
> will
> > > have capacity at the receiver to handle. "
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control
> > >
> > >
> > > On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao <kevin.ying...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for driving this, I think it is really helpful for jobs
> > suffering
> > > > from backpressure.
> > > >
> > > > Best,
> > > > Yingjie
> > > >
> > > > Anton,Kalashnikov <kaa....@yandex.com> 于2021年7月9日周五 下午10:59写道:
> > > >
> > > > > Hey!
> > > > >
> > > > > There is a wish to decrease amount of in-flight data which can
> > improve
> > > > > aligned checkpoint time(fewer in-flight data to process before
> > > > > checkpoint can complete) and improve the behaviour and performance
> of
> > > > > unaligned checkpoints (fewer in-flight data that needs to be
> > persisted
> > > > > in every unaligned checkpoint). The main idea is not to keep as
> much
> > > > > in-flight data as much memory we have but keeping the amount of
> data
> > > > > which can be predictably handling for configured amount of time(ex.
> > we
> > > > > keep data which can be processed in 1 sec). It can be achieved by
> > > > > calculation of the effective throughput and following changes the
> > > buffer
> > > > > size based on the this throughput. More details about the proposal
> > you
> > > > > can find here [1].
> > > > >
> > > > > What are you thoughts about it?
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> > > > >
> > > > >
> > > > > --
> > > > > Best regards,
> > > > > Anton Kalashnikov
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to