@Thomas just to double check:

  - parallelism and configuration changes should be well possible on
unaligned checkpoints
  - changes in state types and JobGraph structure would be tricky, and
changing the on-the-wire types would not be possible.

On Wed, Aug 14, 2019 at 7:48 PM Thomas Weise <t...@apache.org> wrote:

> -->
>
> On Wed, Aug 14, 2019 at 10:23 AM zhijiang
> <wangzhijiang...@aliyun.com.invalid> wrote:
>
> > Thanks for these great points and disccusions!
> >
> > 1. Considering the way of triggering checkpoint RPC calls to all the
> tasks
> > from Chandy Lamport, it combines two different mechanisms together to
> make
> > sure that the trigger could be fast in different scenarios.
> > But in flink world it might be not very worth trying that way, just as
> > Stephan's analysis for it. Another concern is that it might bring more
> > heavy loads for JobMaster broadcasting this checkpoint RPC to all the
> tasks
> > in large scale job, especially for the very short checkpoint interval.
> > Furthermore it would also cause other important RPC to be executed delay
> to
> > bring potentail timeout risks.
> >
> > 2. I agree with the idea of drawing on the way "take state snapshot on
> > first barrier" from Chandy Lamport instead of barrier alignment combining
> > with unaligned checkpoints in flink.
> >
> > > >>>> The benefit would be less latency increase in the channels which
> > already have received barriers.
> > > >>>> However, as mentioned before, not prioritizing the inputs from
> > which barriers are still missing can also have an adverse effect.
> >
> > I think we will not have an adverse effect if not prioritizing the inputs
> > w/o barriers in this case. After sync snapshot, the task could actually
> > process any input channels. For the input channel receiving the first
> > barrier, we already have the obvious boundary for persisting buffers. For
> > other channels w/o barriers we could persist the following buffers for
> > these channels until barrier arrives in network. Because based on the
> > credit based flow control, the barrier does not need credit to transport,
> > then as long as the sender overtakes the barrier accross the output
> queue,
> > the network stack would transport this barrier immediately no matter with
> > the inputs condition on receiver side. So there is no requirements to
> > consume accumulated buffers in these channels for higher priority. If so
> it
> > seems that we will not waste any CPU cycles as Piotr concerns before.
> >
>
> I'm not sure I follow this. For the checkpoint to complete, any buffer that
> arrived prior to the barrier would be to be part of the checkpointed state.
> So wouldn't it be important to finish persisting these buffers as fast as
> possible by prioritizing respective inputs? The task won't be able to
> process records from the inputs that have seen the barrier fast when it is
> already backpressured (or causing the backpressure).
>
>
> >
> > 3. Suppose the unaligned checkpoints performing well in practice, is it
> > possible to make it as the only mechanism for handling all the cases? I
> > mean for the non-backpressure scenario, there are less buffers even empty
> > in input/output queue, then the "overtaking barrier--> trigger snapshot
> on
> > first barrier--> persist buffers" might still work well. So we do not
> need
> > to maintain two suits of mechanisms finally.
> >
> > 4.  The initial motivation of this dicussion is for checkpoint timeout in
> > backpressure scenario. If we adjust the default timeout to a very big
> > value, that means the checkpoint would never timeout and we only need to
> > wait it finish. Then are there still any other problems/concerns if
> > checkpoint takes long time to finish? Althougn we already knew some
> issues
> > before, it is better to gather more user feedbacks to confirm which
> aspects
> > could be solved in this feature design. E.g. the sink commit delay might
> > not be coverd by unaligned solution.
> >
>
> Checkpoints taking too long is the concern that sparks this discussion
> (timeout is just a symptom). The slowness issue also applies to the
> savepoint use case. We would need to be able to take a savepoint fast in
> order to roll forward a fix that can alleviate the backpressure (like
> changing parallelism or making a different configuration change).
>
>
> >
> > Best,
> > Zhijiang
> > ------------------------------------------------------------------
> > From:Stephan Ewen <se...@apache.org>
> > Send Time:2019年8月14日(星期三) 17:43
> > To:dev <dev@flink.apache.org>
> > Subject:Re: Checkpointing under backpressure
> >
> > Quick note: The current implementation is
> >
> > Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part)
> >
> > On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski <pi...@ververica.com>
> > wrote:
> >
> > > > Thanks for the great ideas so far.
> > >
> > > +1
> > >
> > > Regarding other things raised, I mostly agree with Stephan.
> > >
> > > I like the idea of simultaneously starting the checkpoint everywhere
> via
> > > RPC call (especially in cases where Tasks are busy doing some
> synchronous
> > > operations for example for tens of milliseconds. In that case every
> > network
> > > exchange adds tens of milliseconds of delay in propagating the
> > checkpoint).
> > > However I agree that this might be a premature optimisation assuming
> the
> > > current state of our code (we already have checkpoint barriers).
> > >
> > > However I like the idea of switching from:
> > >
> > > 1. A -> S -> F (Align -> snapshot -> forward markers)
> > >
> > > To
> > >
> > > 2. S -> F -> L (Snapshot -> forward markers -> log pending channels)
> > >
> > > Or even to
> > >
> > > 6. F -> S -> L (Forward markers -> snapshot -> log pending channels)
> > >
> > > It feels to me like this would decouple propagation of checkpoints from
> > > costs of synchronous snapshots and waiting for all of the checkpoint
> > > barriers to arrive (even if they will overtake in-flight records, this
> > > might take some time).
> > >
> > > > What I like about the Chandy Lamport approach (2.) initiated from
> > > sources is that:
> > > >       - Snapshotting imposes no modification to normal processing.
> > >
> > > Yes, I agree that would be nice. Currently, during the alignment and
> > > blocking of the input channels, we might be wasting CPU cycles of up
> > stream
> > > tasks. If we succeed in designing new checkpointing mechanism to not
> > > disrupt/block regular data processing (% the extra IO cost for logging
> > the
> > > in-flight records), that would be a huge improvement.
> > >
> > > Piotrek
> > >
> > > > On 14 Aug 2019, at 14:56, Paris Carbone <seniorcarb...@gmail.com>
> > wrote:
> > > >
> > > > Sure I see. In cases when no periodic aligned snapshots are employed
> > > this is the only option.
> > > >
> > > > Two things that were not highlighted enough so far on the proposed
> > > protocol (included my mails):
> > > >       - The Recovery/Reconfiguration strategy should strictly
> > prioritise
> > > processing logged events before entering normal task input operation.
> > > Otherwise causality can be violated. This also means dataflow recovery
> > will
> > > be expected to be slower to the one employed on an aligned snapshot.
> > > >       - Same as with state capture, markers should be forwarded upon
> > > first marker received on input. No later than that. Otherwise we have
> > > duplicate side effects.
> > > >
> > > > Thanks for the great ideas so far.
> > > >
> > > > Paris
> > > >
> > > >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org> wrote:
> > > >>
> > > >> Scaling with unaligned checkpoints might be a necessity.
> > > >>
> > > >> Let's assume the job failed due to a lost TaskManager, but no new
> > > >> TaskManager becomes available.
> > > >> In that case we need to scale down based on the latest complete
> > > checkpoint,
> > > >> because we cannot produce a new checkpoint.
> > > >>
> > > >>
> > > >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone <
> > seniorcarb...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> +1 I think we are on the same page Stephan.
> > > >>>
> > > >>> Rescaling on unaligned checkpoint sounds challenging and a bit
> > > >>> unnecessary. No?
> > > >>> Why not sticking to aligned snapshots for live
> > > reconfiguration/rescaling?
> > > >>> It’s a pretty rare operation and it would simplify things by a lot.
> > > >>> Everything can be “staged” upon alignment including replacing
> > channels
> > > and
> > > >>> tasks.
> > > >>>
> > > >>> -Paris
> > > >>>
> > > >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote:
> > > >>>>
> > > >>>> Hi all!
> > > >>>>
> > > >>>> Yes, the first proposal of "unaligend checkpoints" (probably two
> > years
> > > >>> back
> > > >>>> now) drew a major inspiration from Chandy Lamport, as did actually
> > the
> > > >>>> original checkpointing algorithm.
> > > >>>>
> > > >>>> "Logging data between first and last barrier" versus "barrier
> > jumping
> > > >>> over
> > > >>>> buffer and storing those buffers" is pretty close same.
> > > >>>> However, there are a few nice benefits of the proposal of
> unaligned
> > > >>>> checkpoints over Chandy-Lamport.
> > > >>>>
> > > >>>> *## Benefits of Unaligned Checkpoints*
> > > >>>>
> > > >>>> (1) It is very similar to the original algorithm (can be seen an
> an
> > > >>>> optional feature purely in the network stack) and thus can share
> > > lot's of
> > > >>>> code paths.
> > > >>>>
> > > >>>> (2) Less data stored. If we make the "jump over buffers" part
> > timeout
> > > >>> based
> > > >>>> (for example barrier overtakes buffers if not flushed within 10ms)
> > > then
> > > >>>> checkpoints are in the common case of flowing pipelines aligned
> > > without
> > > >>>> in-flight data. Only back pressured cases store some in-flight
> data,
> > > >>> which
> > > >>>> means we don't regress in the common case and only fix the back
> > > pressure
> > > >>>> case.
> > > >>>>
> > > >>>> (3) Faster checkpoints. Chandy Lamport still waits for all
> barriers
> > to
> > > >>>> arrive naturally, logging on the way. If data processing is slow,
> > this
> > > >>> can
> > > >>>> still take quite a while.
> > > >>>>
> > > >>>> ==> I think both these points are strong reasons to not change the
> > > >>>> mechanism away from "trigger sources" and start with CL-style
> > "trigger
> > > >>> all".
> > > >>>>
> > > >>>>
> > > >>>> *## Possible ways to combine Chandy Lamport and Unaligned
> > Checkpoints*
> > > >>>>
> > > >>>> We can think about something like "take state snapshot on first
> > > barrier"
> > > >>>> and then store buffers until the other barriers arrive. Inside the
> > > >>> network
> > > >>>> stack, barriers could still overtake and persist buffers.
> > > >>>> The benefit would be less latency increase in the channels which
> > > already
> > > >>>> have received barriers.
> > > >>>> However, as mentioned before, not prioritizing the inputs from
> which
> > > >>>> barriers are still missing can also have an adverse effect.
> > > >>>>
> > > >>>>
> > > >>>> *## Concerning upgrades*
> > > >>>>
> > > >>>> I think it is a fair restriction to say that upgrades need to
> happen
> > > on
> > > >>>> aligned checkpoints. It is a rare enough operation.
> > > >>>>
> > > >>>>
> > > >>>> *## Concerning re-scaling (changing parallelism)*
> > > >>>>
> > > >>>> We need to support that on unaligned checkpoints as well. There
> are
> > > >>> several
> > > >>>> feature proposals about automatic scaling, especially down scaling
> > in
> > > >>> case
> > > >>>> of missing resources. The last snapshot might be a regular
> > > checkpoint, so
> > > >>>> all checkpoints need to support rescaling.
> > > >>>>
> > > >>>>
> > > >>>> *## Concerning end-to-end checkpoint duration and "trigger
> sources"
> > > >>> versus
> > > >>>> "trigger all"*
> > > >>>>
> > > >>>> I think for the end-to-end checkpoint duration, an "overtake
> > buffers"
> > > >>>> approach yields faster checkpoints, as mentioned above (Chandy
> > Lamport
> > > >>>> logging still needs to wait for barrier to flow).
> > > >>>>
> > > >>>> I don't see the benefit of a "trigger all tasks via RPC
> > concurrently"
> > > >>>> approach. Bear in mind that it is still a globally coordinated
> > > approach
> > > >>> and
> > > >>>> you need to wait for the global checkpoint to complete before
> > > committing
> > > >>>> any side effects.
> > > >>>> I believe that the checkpoint time is more determined by the state
> > > >>>> checkpoint writing, and the global coordination and metadata
> commit,
> > > than
> > > >>>> by the difference in alignment time between "trigger from source
> and
> > > jump
> > > >>>> over buffers" versus "trigger all tasks concurrently".
> > > >>>>
> > > >>>> Trying to optimize a few tens of milliseconds out of the network
> > stack
> > > >>>> sends (and changing the overall checkpointing approach completely
> > for
> > > >>> that)
> > > >>>> while staying with a globally coordinated checkpoint will send us
> > > down a
> > > >>>> path to a dead end.
> > > >>>>
> > > >>>> To really bring task persistence latency down to 10s of
> milliseconds
> > > (so
> > > >>> we
> > > >>>> can frequently commit in sinks), we need to take an approach
> without
> > > any
> > > >>>> global coordination. Tasks need to establish a persistent recovery
> > > point
> > > >>>> individually and at their own discretion, only then can it be
> > frequent
> > > >>>> enough. To get there, they would need to decouple themselves from
> > the
> > > >>>> predecessor and successor tasks (via something like persistent
> > > channels).
> > > >>>> This is a different discussion, though, somewhat orthogonal to
> this
> > > one
> > > >>>> here.
> > > >>>>
> > > >>>> Best,
> > > >>>> Stephan
> > > >>>>
> > > >>>>
> > > >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <
> > pi...@ververica.com>
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi again,
> > > >>>>>
> > > >>>>> Zhu Zhu let me think about this more. Maybe as Paris is writing,
> we
> > > do
> > > >>> not
> > > >>>>> need to block any channels at all, at least assuming credit base
> > flow
> > > >>>>> control. Regarding what should happen with the following
> checkpoint
> > > is
> > > >>>>> another question. Also, should we support concurrent checkpoints
> > and
> > > >>>>> subsuming checkpoints as we do now? Maybe not…
> > > >>>>>
> > > >>>>> Paris
> > > >>>>>
> > > >>>>> Re
> > > >>>>> I. 2. a) and b) - yes, this would have to be taken into an
> account
> > > >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint time
> > will
> > > >>>>> probably be longer than it could be. It might affect external
> > > systems.
> > > >>> For
> > > >>>>> example Kafka, which automatically time outs lingering
> > transactions,
> > > and
> > > >>>>> for us, the transaction time is equal to the time between two
> > > >>> checkpoints.
> > > >>>>>
> > > >>>>> II 1. - I’m confused. To make things straight. Flink is currently
> > > >>>>> snapshotting once it receives all of the checkpoint barriers from
> > > all of
> > > >>>>> the input channels and only then it broadcasts the checkpoint
> > barrier
> > > >>> down
> > > >>>>> the stream. And this is correct from exactly-once perspective.
> > > >>>>>
> > > >>>>> As far as I understand, your proposal based on Chandy Lamport
> > > algorithm,
> > > >>>>> is snapshotting the state of the operator on the first checkpoint
> > > >>> barrier,
> > > >>>>> which also looks correct to me.
> > > >>>>>
> > > >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more about
> > > this.
> > > >>>>>
> > > >>>>> V. Yes, we still need aligned checkpoints, as they are easier for
> > > state
> > > >>>>> migration and upgrades.
> > > >>>>>
> > > >>>>> Piotrek
> > > >>>>>
> > > >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone <
> seniorcarb...@gmail.com>
> > > >>> wrote:
> > > >>>>>>
> > > >>>>>> Now I see a little more clearly what you have in mind. Thanks
> for
> > > the
> > > >>>>> explanation!
> > > >>>>>> There are a few intermixed concepts here, some how to do with
> > > >>>>> correctness some with performance.
> > > >>>>>> Before delving deeper I will just enumerate a few things to make
> > > myself
> > > >>>>> a little more helpful if I can.
> > > >>>>>>
> > > >>>>>> I. Initiation
> > > >>>>>> -------------
> > > >>>>>>
> > > >>>>>> 1. RPC to sources only is a less intrusive way to initiate
> > snapshots
> > > >>>>> since you utilize better pipeline parallelism (only a small
> subset
> > of
> > > >>> tasks
> > > >>>>> is running progressively the protocol at a time, if snapshotting
> is
> > > >>> async
> > > >>>>> the overall overhead might not even be observable).
> > > >>>>>>
> > > >>>>>> 2. If we really want an RPC to all initiation take notice of the
> > > >>>>> following implications:
> > > >>>>>>
> > > >>>>>>    a. (correctness) RPC calls are not guaranteed to arrive in
> > every
> > > >>>>> task before a marker from a preceding task.
> > > >>>>>>
> > > >>>>>>    b. (correctness) Either the RPC call OR the first arriving
> > marker
> > > >>>>> should initiate the algorithm. Whichever comes first. If you only
> > do
> > > it
> > > >>> per
> > > >>>>> RPC call then you capture a "late" state that includes side
> effects
> > > of
> > > >>>>> already logged events.
> > > >>>>>>
> > > >>>>>>    c. (performance) Lots of IO will be invoked at the same time
> on
> > > >>>>> the backend store from all tasks. This might lead to high
> > congestion
> > > in
> > > >>>>> async snapshots.
> > > >>>>>>
> > > >>>>>> II. Capturing State First
> > > >>>>>> -------------------------
> > > >>>>>>
> > > >>>>>> 1. (correctness) Capturing state at the last marker sounds
> > > incorrect to
> > > >>>>> me (state contains side effects of already logged events based on
> > the
> > > >>>>> proposed scheme). This results into duplicate processing. No?
> > > >>>>>>
> > > >>>>>> III. Channel Blocking / "Alignment"
> > > >>>>>> -----------------------------------
> > > >>>>>>
> > > >>>>>> 1. (performance?) What is the added benefit? We dont want a
> > > "complete"
> > > >>>>> transactional snapshot, async snapshots are purely for
> > > failure-recovery.
> > > >>>>> Thus, I dont see why this needs to be imposed at the expense of
> > > >>>>> performance/throughput. With the proposed scheme the whole
> dataflow
> > > >>> anyway
> > > >>>>> enters snapshotting/logging mode so tasks more or less snapshot
> > > >>>>> concurrently.
> > > >>>>>>
> > > >>>>>> IV Marker Bypassing
> > > >>>>>> -------------------
> > > >>>>>>
> > > >>>>>> 1. (correctness) This leads to equivalent in-flight snapshots so
> > > with
> > > >>>>> some quick thinking  correct. I will try to model this later and
> > get
> > > >>> back
> > > >>>>> to you in case I find something wrong.
> > > >>>>>>
> > > >>>>>> 2. (performance) It also sounds like a meaningful optimisation!
> I
> > > like
> > > >>>>> thinking of this as a push-based snapshot. i.e., the producing
> task
> > > >>> somehow
> > > >>>>> triggers forward a consumer/channel to capture its state. By
> > example
> > > >>>>> consider T1 -> |marker t1| -> T2.
> > > >>>>>>
> > > >>>>>> V. Usage of "Async" Snapshots
> > > >>>>>> ---------------------
> > > >>>>>>
> > > >>>>>> 1. Do you see this as a full replacement of "full" aligned
> > > >>>>> snapshots/savepoints? In my view async shanpshots will be needed
> > from
> > > >>> time
> > > >>>>> to time but not as frequently. Yet, it seems like a valid
> approach
> > > >>> solely
> > > >>>>> for failure-recovery on the same configuration. Here's why:
> > > >>>>>>
> > > >>>>>>    a. With original snapshotting there is a strong duality
> between
> > > >>>>>>    a stream input (offsets) and committed side effects (internal
> > > >>>>> states and external commits to transactional sinks). While in the
> > > async
> > > >>>>> version, there are uncommitted operations (inflight records).
> Thus,
> > > you
> > > >>>>> cannot use these snapshots for e.g., submitting sql queries with
> > > >>> snapshot
> > > >>>>> isolation. Also, the original snapshotting gives a lot of
> potential
> > > for
> > > >>>>> flink to make proper transactional commits externally.
> > > >>>>>>
> > > >>>>>>    b. Reconfiguration is very tricky, you probably know that
> > better.
> > > >>>>> Inflight channel state is no longer valid in a new configuration
> > > (i.e.,
> > > >>> new
> > > >>>>> dataflow graph, new operators, updated operator logic, different
> > > >>> channels,
> > > >>>>> different parallelism)
> > > >>>>>>
> > > >>>>>> 2. Async snapshots can also be potentially useful for monitoring
> > the
> > > >>>>> general health of a dataflow since they can be analyzed by the
> task
> > > >>> manager
> > > >>>>> about the general performance of a job graph and spot bottlenecks
> > for
> > > >>>>> example.
> > > >>>>>>
> > > >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com>
> > > wrote:
> > > >>>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>> Thomas:
> > > >>>>>>> There are no Jira tickets yet (or maybe there is something very
> > old
> > > >>>>> somewhere). First we want to discuss it, next present FLIP and at
> > > last
> > > >>>>> create tickets :)
> > > >>>>>>>
> > > >>>>>>>> if I understand correctly, then the proposal is to not block
> any
> > > >>>>>>>> input channel at all, but only log data from the backpressured
> > > >>> channel
> > > >>>>> (and
> > > >>>>>>>> make it part of the snapshot) until the barrier arrives
> > > >>>>>>>
> > > >>>>>>> I would guess that it would be better to block the reads,
> unless
> > we
> > > >>> can
> > > >>>>> already process the records from the blocked channel…
> > > >>>>>>>
> > > >>>>>>> Paris:
> > > >>>>>>>
> > > >>>>>>> Thanks for the explanation Paris. I’m starting to understand
> this
> > > more
> > > >>>>> and I like the idea of snapshotting the state of an operator
> before
> > > >>>>> receiving all of the checkpoint barriers - this would allow more
> > > things
> > > >>> to
> > > >>>>> happen at the same time instead of sequentially. As Zhijiang has
> > > pointed
> > > >>>>> out there are some things not considered in your proposal:
> > overtaking
> > > >>>>> output buffers, but maybe those things could be incorporated
> > > together.
> > > >>>>>>>
> > > >>>>>>> Another thing is that from the wiki description I understood
> that
> > > the
> > > >>>>> initial checkpointing is not initialised by any checkpoint
> barrier,
> > > but
> > > >>> by
> > > >>>>> an independent call/message from the Observer. I haven’t played
> > with
> > > >>> this
> > > >>>>> idea a lot, but I had some discussion with Nico and it seems that
> > it
> > > >>> might
> > > >>>>> work:
> > > >>>>>>>
> > > >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all tasks
> > > >>>>>>> 2. Task (with two input channels l1 and l2) upon receiving RPC
> > from
> > > >>> 1.,
> > > >>>>> takes a snapshot of it's state and:
> > > >>>>>>> a) broadcast checkpoint barrier down the stream to all channels
> > > (let’s
> > > >>>>> ignore for a moment potential for this barrier to overtake the
> > buffer
> > > >>>>> output data)
> > > >>>>>>> b) for any input channel for which it hasn’t yet received
> > > checkpoint
> > > >>>>> barrier, the data are being added to the checkpoint
> > > >>>>>>> c) once a channel (for example l1) receives a checkpoint
> barrier,
> > > the
> > > >>>>> Task blocks reads from that channel (?)
> > > >>>>>>> d) after all remaining channels (l2) receive checkpoint
> barriers,
> > > the
> > > >>>>> Task  first has to process the buffered data after that it can
> > > unblock
> > > >>> the
> > > >>>>> reads from the channels
> > > >>>>>>>
> > > >>>>>>> Checkpoint barriers do not cascade/flow through different tasks
> > > here.
> > > >>>>> Checkpoint barrier emitted from Task1, reaches only the immediate
> > > >>>>> downstream Tasks. Thanks to this setup, total checkpointing time
> is
> > > not
> > > >>> sum
> > > >>>>> of checkpointing times of all Tasks one by one, but more or less
> > max
> > > of
> > > >>> the
> > > >>>>> slowest Tasks. Right?
> > > >>>>>>>
> > > >>>>>>> Couple of intriguing thoughts are:
> > > >>>>>>> 3. checkpoint barriers overtaking the output buffers
> > > >>>>>>> 4. can we keep processing some data (in order to not waste CPU
> > > cycles)
> > > >>>>> after we have taking the snapshot of the Task. I think we could.
> > > >>>>>>>
> > > >>>>>>> Piotrek
> > > >>>>>>>
> > > >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org>
> wrote:
> > > >>>>>>>>
> > > >>>>>>>> Great discussion! I'm excited that this is already under
> > > >>>>> consideration! Are
> > > >>>>>>>> there any JIRAs or other traces of discussion to follow?
> > > >>>>>>>>
> > > >>>>>>>> Paris, if I understand correctly, then the proposal is to not
> > > block
> > > >>> any
> > > >>>>>>>> input channel at all, but only log data from the backpressured
> > > >>> channel
> > > >>>>> (and
> > > >>>>>>>> make it part of the snapshot) until the barrier arrives? This
> is
> > > >>>>>>>> intriguing. But probably there is also a benefit of to not
> > > continue
> > > >>>>> reading
> > > >>>>>>>> I1 since that could speed up retrieval from I2. Also, if the
> > user
> > > >>> code
> > > >>>>> is
> > > >>>>>>>> the cause of backpressure, this would avoid pumping more data
> > into
> > > >>> the
> > > >>>>>>>> process function.
> > > >>>>>>>>
> > > >>>>>>>> Thanks,
> > > >>>>>>>> Thomas
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <
> > > wangzhijiang...@aliyun.com
> > > >>>>> .invalid>
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Paris,
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks for the detailed sharing. And I think it is very
> similar
> > > with
> > > >>>>> the
> > > >>>>>>>>> way of overtaking we proposed before.
> > > >>>>>>>>>
> > > >>>>>>>>> There are some tiny difference:
> > > >>>>>>>>> The way of overtaking might need to snapshot all the
> > input/output
> > > >>>>> queues.
> > > >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input
> channels
> > > >>> after
> > > >>>>> the
> > > >>>>>>>>> first barrier arrives, which might reduce the state sizea
> bit.
> > > But
> > > >>>>> normally
> > > >>>>>>>>> there should be less buffers for the first input channel with
> > > >>> barrier.
> > > >>>>>>>>> The output barrier still follows with regular data stream in
> > > Chandy
> > > >>>>>>>>> Lamport, the same way as current flink. For overtaking way,
> we
> > > need
> > > >>>>> to pay
> > > >>>>>>>>> extra efforts to make barrier transport firstly before outque
> > > queue
> > > >>> on
> > > >>>>>>>>> upstream side, and change the way of barrier alignment based
> on
> > > >>>>> receiving
> > > >>>>>>>>> instead of current reading on downstream side.
> > > >>>>>>>>> In the backpressure caused by data skew, the first barrier in
> > > almost
> > > >>>>> empty
> > > >>>>>>>>> input channel should arrive much eariler than the last heavy
> > load
> > > >>>>> input
> > > >>>>>>>>> channel, so the Chandy Lamport could benefit well. But for
> the
> > > case
> > > >>>>> of all
> > > >>>>>>>>> balanced heavy load input channels, I mean the first arrived
> > > barrier
> > > >>>>> might
> > > >>>>>>>>> still take much time, then the overtaking way could still fit
> > > well
> > > >>> to
> > > >>>>> speed
> > > >>>>>>>>> up checkpoint.
> > > >>>>>>>>> Anyway, your proposed suggestion is helpful on my side,
> > > especially
> > > >>>>>>>>> considering some implementation details .
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Zhijiang
> > > >>>>>>>>>
> > > ------------------------------------------------------------------
> > > >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com>
> > > >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03
> > > >>>>>>>>> To:dev <dev@flink.apache.org>
> > > >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com>
> > > >>>>>>>>> Subject:Re: Checkpointing under backpressure
> > > >>>>>>>>>
> > > >>>>>>>>> yes! It’s quite similar I think.  Though mind that the devil
> is
> > > in
> > > >>> the
> > > >>>>>>>>> details, i.e., the temporal order actions are taken.
> > > >>>>>>>>>
> > > >>>>>>>>> To clarify, let us say you have a task T with two input
> > channels
> > > I1
> > > >>>>> and I2.
> > > >>>>>>>>> The Chandy Lamport execution flow is the following:
> > > >>>>>>>>>
> > > >>>>>>>>> 1) T receives barrier from  I1 and...
> > > >>>>>>>>> 2)  ...the following three actions happen atomically
> > > >>>>>>>>> I )  T snapshots its state T*
> > > >>>>>>>>> II)  T forwards marker to its outputs
> > > >>>>>>>>> III) T starts logging all events of I2 (only) into a buffer
> M*
> > > >>>>>>>>> - Also notice here that T does NOT block I1 as it does in
> > aligned
> > > >>>>>>>>> snapshots -
> > > >>>>>>>>> 3) Eventually T receives barrier from I2 and stops recording
> > > events.
> > > >>>>> Its
> > > >>>>>>>>> asynchronously captured snapshot is now complete: {T*,M*}.
> > > >>>>>>>>> Upon recovery all messages of M* should be replayed in FIFO
> > > order.
> > > >>>>>>>>>
> > > >>>>>>>>> With this approach alignment does not create a deadlock
> > situation
> > > >>>>> since
> > > >>>>>>>>> anyway 2.II happens asynchronously and messages can be logged
> > as
> > > >>> well
> > > >>>>>>>>> asynchronously during the process of the snapshot. If there
> is
> > > >>>>>>>>> back-pressure in a pipeline the cause is most probably not
> this
> > > >>>>> algorithm.
> > > >>>>>>>>>
> > > >>>>>>>>> Back to your observation, the answer : yes and no.  In your
> > > network
> > > >>>>> model,
> > > >>>>>>>>> I can see the logic of “logging” and “committing” a final
> > > snapshot
> > > >>>>> being
> > > >>>>>>>>> provided by the channel implementation. However, do mind that
> > the
> > > >>>>> first
> > > >>>>>>>>> barrier always needs to go “all the way” to initiate the
> Chandy
> > > >>>>> Lamport
> > > >>>>>>>>> algorithm logic.
> > > >>>>>>>>>
> > > >>>>>>>>> The above flow has been proven using temporal logic in my phd
> > > thesis
> > > >>>>> in
> > > >>>>>>>>> case you are interested about the proof.
> > > >>>>>>>>> I hope this helps a little clarifying things. Let me know if
> > > there
> > > >>> is
> > > >>>>> any
> > > >>>>>>>>> confusing point to disambiguate. I would be more than happy
> to
> > > help
> > > >>>>> if I
> > > >>>>>>>>> can.
> > > >>>>>>>>>
> > > >>>>>>>>> Paris
> > > >>>>>>>>>
> > > >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <
> pi...@ververica.com
> > >
> > > >>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots
> > > don’t
> > > >>>>> you
> > > >>>>>>>>> still have to wait for the “checkpoint barrier” to arrive in
> > > order
> > > >>> to
> > > >>>>> know
> > > >>>>>>>>> when have you already received all possible messages from the
> > > >>> upstream
> > > >>>>>>>>> tasks/operators? So instead of processing the “in flight”
> > > messages
> > > >>>>> (as the
> > > >>>>>>>>> Flink is doing currently), you are sending them to an
> > “observer”?
> > > >>>>>>>>>>
> > > >>>>>>>>>> In that case, that’s sounds similar to “checkpoint barriers
> > > >>>>> overtaking
> > > >>>>>>>>> in flight records” (aka unaligned checkpoints). Just for us,
> > the
> > > >>>>> observer
> > > >>>>>>>>> is a snapshot state.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Piotrek
> > > >>>>>>>>>>
> > > >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone <
> > > seniorcarb...@gmail.com>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Interesting problem! Thanks for bringing it up Thomas.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe
> Chandy-Lamport
> > > >>>>> snapshots
> > > >>>>>>>>> [1] would help out solve this problem more elegantly without
> > > >>>>> sacrificing
> > > >>>>>>>>> correctness.
> > > >>>>>>>>>>> - They do not need alignment, only (async) logging for
> > > in-flight
> > > >>>>>>>>> records between the time the first barrier is processed until
> > the
> > > >>> last
> > > >>>>>>>>> barrier arrives in a task.
> > > >>>>>>>>>>> - They work fine for failure recovery as long as logged
> > records
> > > >>> are
> > > >>>>>>>>> replayed on startup.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still
> > necessary
> > > >>> for
> > > >>>>>>>>> transactional sink commits + any sort of reconfiguration
> (e.g.,
> > > >>>>> rescaling,
> > > >>>>>>>>> updating the logic of operators to evolve an application
> etc.).
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I don’t completely understand the “overtaking” approach but
> > if
> > > you
> > > >>>>> have
> > > >>>>>>>>> a concrete definition I would be happy to check it out and
> help
> > > if I
> > > >>>>> can!
> > > >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by logging
> > > things
> > > >>> in
> > > >>>>>>>>> pending channels in a task snapshot before the barrier
> arrives.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> -Paris
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> [1]
> > > >>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
> > > >>>>> <
> > > >>>>>>>>>
> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
> > >
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <
> > pi...@ververica.com
> > > >
> > > >>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Thomas,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> As Zhijiang has responded, we are now in the process of
> > > >>> discussing
> > > >>>>> how
> > > >>>>>>>>> to address this issue and one of the solution that we are
> > > discussing
> > > >>>>> is
> > > >>>>>>>>> exactly what you are proposing: checkpoint barriers
> overtaking
> > > the
> > > >>> in
> > > >>>>>>>>> flight data and make the in flight data part of the
> checkpoint.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> If everything works well, we will be able to present
> result
> > of
> > > >>> our
> > > >>>>>>>>> discussions on the dev mailing list soon.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Piotrek
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang <
> > > wangzhijiang...@aliyun.com
> > > >>>>> .INVALID>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Thomas,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks for proposing this concern. The barrier alignment
> > > takes
> > > >>>>> long
> > > >>>>>>>>> time in backpressure case which could cause several problems:
> > > >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned.
> > > >>>>>>>>>>>>> 2. The recovery cost is high once failover, because much
> > data
> > > >>>>> needs
> > > >>>>>>>>> to be replayed.
> > > >>>>>>>>>>>>> 3. The delay for commit-based sink is high in
> exactly-once.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> For credit-based flow control from release-1.5, the
> amount
> > of
> > > >>>>>>>>> in-flighting buffers before barrier alignment is reduced, so
> we
> > > >>> could
> > > >>>>> get a
> > > >>>>>>>>> bit
> > > >>>>>>>>>>>>> benefits from speeding checkpoint aspect.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the channels
> > which
> > > >>>>> already
> > > >>>>>>>>> received the barrier in practice. But actually we ever did
> the
> > > >>>>> similar thing
> > > >>>>>>>>>>>>> to speed barrier alighment before. I am not quite sure
> that
> > > >>>>>>>>> release-1.8 covers this feature. There were some relevant
> > > >>> discussions
> > > >>>>> under
> > > >>>>>>>>> jira [1].
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> For release-1.10, the community is now discussing the
> > > feature of
> > > >>>>>>>>> unaligned checkpoint which is mainly for resolving above
> > > concerns.
> > > >>> The
> > > >>>>>>>>> basic idea
> > > >>>>>>>>>>>>> is to make barrier overtakes the output/input buffer
> queue
> > to
> > > >>>>> speed
> > > >>>>>>>>> alignment, and snapshot the input/output buffers as part of
> > > >>> checkpoint
> > > >>>>>>>>> state. The
> > > >>>>>>>>>>>>> details have not confirmed yet and is still under
> > discussion.
> > > >>>>> Wish we
> > > >>>>>>>>> could make some improvments for the release-1.10.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>> Zhijiang
> > > >>>>>>>>>>>>>
> > > >>> ------------------------------------------------------------------
> > > >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org>
> > > >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38
> > > >>>>>>>>>>>>> To:dev <dev@flink.apache.org>
> > > >>>>>>>>>>>>> Subject:Checkpointing under backpressure
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> One of the major operational difficulties we observe with
> > > Flink
> > > >>>>> are
> > > >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for
> > both
> > > >>>>>>>>> confirmation
> > > >>>>>>>>>>>>> of my understanding of the current behavior as well as
> > > pointers
> > > >>>>> for
> > > >>>>>>>>> future
> > > >>>>>>>>>>>>> improvement work:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Prior to introduction of credit based flow control in the
> > > >>> network
> > > >>>>>>>>> stack [1]
> > > >>>>>>>>>>>>> [2], checkpoint barriers would back up with the data for
> > all
> > > >>>>> logical
> > > >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the
> > > buffers
> > > >>> are
> > > >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are only
> > held
> > > >>>>> back for
> > > >>>>>>>>>>>>> channels that have backpressure, while others can
> continue
> > > >>>>> processing
> > > >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot
> > "overtake
> > > >>>>> data",
> > > >>>>>>>>>>>>> therefore checkpoint alignment remains affected for the
> > > channel
> > > >>>>> with
> > > >>>>>>>>>>>>> backpressure, with the potential for slow checkpointing
> and
> > > >>>>> timeouts.
> > > >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the
> maximum
> > > >>>>> in-transit
> > > >>>>>>>>>>>>> buffers per channel, resulting in an improvement compared
> > to
> > > >>>>> previous
> > > >>>>>>>>>>>>> versions of Flink. Also, the backpressure based
> checkpoint
> > > >>>>> alignment
> > > >>>>>>>>> can
> > > >>>>>>>>>>>>> help the barrier advance faster on the receiver side (by
> > > >>>>> suspending
> > > >>>>>>>>>>>>> channels that have already delivered the barrier). Is
> that
> > > >>>>> accurate
> > > >>>>>>>>> as of
> > > >>>>>>>>>>>>> Flink 1.8?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> What appears to be missing to completely unblock
> > > checkpointing
> > > >>> is
> > > >>>>> a
> > > >>>>>>>>>>>>> mechanism for checkpoints to overtake the data. That
> would
> > > help
> > > >>> in
> > > >>>>>>>>>>>>> situations where the processing itself is the bottleneck
> > and
> > > >>>>>>>>> prioritization
> > > >>>>>>>>>>>>> in the network stack alone cannot address the barrier
> > delay.
> > > Was
> > > >>>>>>>>> there any
> > > >>>>>>>>>>>>> related discussion? One possible solution would be to
> drain
> > > >>>>> incoming
> > > >>>>>>>>> data
> > > >>>>>>>>>>>>> till the barrier and make it part of the checkpoint
> instead
> > > of
> > > >>>>>>>>> processing
> > > >>>>>>>>>>>>> it. This is somewhat related to asynchronous processing,
> > but
> > > I'm
> > > >>>>>>>>> thinking
> > > >>>>>>>>>>>>> more of a solution that is automated in the Flink runtime
> > for
> > > >>> the
> > > >>>>>>>>>>>>> backpressure scenario only.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>> Thomas
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> [1]
> > > >>> https://flink.apache.org/2019/06/05/flink-network-stack.html
> > > >>>>>>>>>>>>> [2]
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>
> > > >>>
> > >
> >
> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>
> > > >>>
> > > >
> > >
> > >
> >
> >
>

Reply via email to