I really like the new idea.

On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski <pnowoj...@apache.org>
wrote:

> Hi Till,
>
> >  I assume that buffer sizes are only
> > changed for newly assigned buffers/credits, right? Otherwise, the data
> > could already be on the wire and then it wouldn't fit on the receiver
> side.
> > Or do we have a back channel mechanism to tell the sender that a part of
> a
> > buffer needs to be resent once more capacity is available?
>
> Initially our implementation proposal was intending to implement the first
> option. Buffer size would be attached to a credit message, so first
> received would need to allocate a buffer with the updated size, send the
> credit upstream, and sender would be allowed to only send as much data as
> in the credit. So there would be no way and no problem with changing buffer
> sizes while something is "on the wire".
>
> However Anton suggested an even simpler idea to me today. There is actually
> no problem with receivers supporting all buffer sizes up to the maximum
> allowed size (current configured memory segment size). Thus new buffer size
> can be treated as a recommendation by the sender. We can announce a new
> buffer size, and the sender will start capping the newly requested buffer
> to that size, but we can still send already filled buffers in chunks with
> any size, as long as it's below max memory segment size. In this way we can
> leave any already filled in buffers on the sender side untouched and we do
> not need to partition/slice them before sending them down, making at least
> the initial version even simpler. This way we also do not need to
> differentiate that different credits have different sizes. We just announce
> a single value "recommended/requested buffer size".
>
> Piotrek
>
> czw., 15 lip 2021 o 17:27 Till Rohrmann <trohrm...@apache.org> napisał(a):
>
> > Hi everyone,
> >
> > Thanks a lot for creating this FLIP Anton and Piotr. I think it looks
> like
> > a very promising solution for speeding up our checkpoints and being able
> to
> > create them more reliably.
> >
> > Following up on Steven's question: I assume that buffer sizes are only
> > changed for newly assigned buffers/credits, right? Otherwise, the data
> > could already be on the wire and then it wouldn't fit on the receiver
> side.
> > Or do we have a back channel mechanism to tell the sender that a part of
> a
> > buffer needs to be resent once more capacity is available?
> >
> > Cheers,
> > Till
> >
> > On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> > > Hi Steven,
> > >
> > > As downstream/upstream nodes are decoupled, if downstream nodes adjust
> > > first it's buffer size first, there will be a lag until this updated
> > buffer
> > > size information reaches the upstream node.. It is a problem, but it
> has
> > a
> > > quite simple solution that we described in the FLIP document:
> > >
> > > > Sending the buffer of the right size.
> > > > It is not enough to know just the number of available buffers
> (credits)
> > > for the downstream because the size of these buffers can be different.
> > > > So we are proposing to resolve this problem in the following way: If
> > the
> > > downstream buffer size is changed then the upstream should send
> > > > the buffer of the size not greater than the new one regardless of how
> > big
> > > the current buffer on the upstream. (pollBuffer should receive
> > > > parameters like bufferSize and return buffer not greater than it)
> > >
> > > So apart from adding buffer size information to the `AddCredit`
> message,
> > we
> > > will need to support a case where upstream subpartition has already
> > > produced a buffer with older size (for example 32KB), while the next
> > credit
> > > arrives with an allowance for a smaller size (16KB). In that case, we
> are
> > > only allowed to send a portion of the data from this buffer that fits
> > into
> > > the new updated buffer size, and keep announcing the remaining part as
> > > available backlog.
> > >
> > > Best,
> > > Piotrek
> > >
> > >
> > > śr., 14 lip 2021 o 08:33 Steven Wu <stevenz...@gmail.com> napisał(a):
> > >
> > > >    - The subtask observes the changes in the throughput and changes
> the
> > > >    buffer size during the whole life period of the task.
> > > >    - The subtask sends buffer size and number of available buffers to
> > the
> > > >    upstream to the corresponding subpartition.
> > > >    - Upstream changes the buffer size corresponding to the received
> > > >    information.
> > > >    - Upstream sends the data and number of filled buffers to the
> > > downstream
> > > >
> > > >
> > > > Will the above steps of buffer size adjustment cause problems with
> > > > credit-based flow control (mainly for downsizing), since downstream
> > > > adjust down first?
> > > >
> > > > Here is the quote from the blog[1]
> > > > "Credit-based flow control makes sure that whatever is “on the wire”
> > will
> > > > have capacity at the receiver to handle. "
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control
> > > >
> > > >
> > > > On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao <kevin.ying...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Thanks for driving this, I think it is really helpful for jobs
> > > suffering
> > > > > from backpressure.
> > > > >
> > > > > Best,
> > > > > Yingjie
> > > > >
> > > > > Anton,Kalashnikov <kaa....@yandex.com> 于2021年7月9日周五 下午10:59写道:
> > > > >
> > > > > > Hey!
> > > > > >
> > > > > > There is a wish to decrease amount of in-flight data which can
> > > improve
> > > > > > aligned checkpoint time(fewer in-flight data to process before
> > > > > > checkpoint can complete) and improve the behaviour and
> performance
> > of
> > > > > > unaligned checkpoints (fewer in-flight data that needs to be
> > > persisted
> > > > > > in every unaligned checkpoint). The main idea is not to keep as
> > much
> > > > > > in-flight data as much memory we have but keeping the amount of
> > data
> > > > > > which can be predictably handling for configured amount of
> time(ex.
> > > we
> > > > > > keep data which can be processed in 1 sec). It can be achieved by
> > > > > > calculation of the effective throughput and following changes the
> > > > buffer
> > > > > > size based on the this throughput. More details about the
> proposal
> > > you
> > > > > > can find here [1].
> > > > > >
> > > > > > What are you thoughts about it?
> > > > > >
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best regards,
> > > > > > Anton Kalashnikov
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to