Thanks for these great points and disccusions!

1. Considering the way of triggering checkpoint RPC calls to all the tasks from 
Chandy Lamport, it combines two different mechanisms together to make sure that 
the trigger could be fast in different scenarios.
But in flink world it might be not very worth trying that way, just as 
Stephan's analysis for it. Another concern is that it might bring more heavy 
loads for JobMaster broadcasting this checkpoint RPC to all the tasks in large 
scale job, especially for the very short checkpoint interval. Furthermore it 
would also cause other important RPC to be executed delay to bring potentail 
timeout risks.

2. I agree with the idea of drawing on the way "take state snapshot on first 
barrier" from Chandy Lamport instead of barrier alignment combining with 
unaligned checkpoints in flink.

> >>>> The benefit would be less latency increase in the channels which already 
> >>>> have received barriers.
> >>>> However, as mentioned before, not prioritizing the inputs from which 
> >>>> barriers are still missing can also have an adverse effect.

I think we will not have an adverse effect if not prioritizing the inputs w/o 
barriers in this case. After sync snapshot, the task could actually process any 
input channels. For the input channel receiving the first barrier, we already 
have the obvious boundary for persisting buffers. For other channels w/o 
barriers we could persist the following buffers for these channels until 
barrier arrives in network. Because based on the credit based flow control, the 
barrier does not need credit to transport, then as long as the sender overtakes 
the barrier accross the output queue, the network stack would transport this 
barrier immediately no matter with the inputs condition on receiver side. So 
there is no requirements to consume accumulated buffers in these channels for 
higher priority. If so it seems that we will not waste any CPU cycles as Piotr 
concerns before.

3. Suppose the unaligned checkpoints performing well in practice, is it 
possible to make it as the only mechanism for handling all the cases? I mean 
for the non-backpressure scenario, there are less buffers even empty in 
input/output queue, then the "overtaking barrier--> trigger snapshot on first 
barrier--> persist buffers" might still work well. So we do not need to 
maintain two suits of mechanisms finally.

4.  The initial motivation of this dicussion is for checkpoint timeout in 
backpressure scenario. If we adjust the default timeout to a very big value, 
that means the checkpoint would never timeout and we only need to wait it 
finish. Then are there still any other problems/concerns if checkpoint takes 
long time to finish? Althougn we already knew some issues before, it is better 
to gather more user feedbacks to confirm which aspects could be solved in this 
feature design. E.g. the sink commit delay might not be coverd by unaligned 
solution.

Best,
Zhijiang
------------------------------------------------------------------
From:Stephan Ewen <se...@apache.org>
Send Time:2019年8月14日(星期三) 17:43
To:dev <dev@flink.apache.org>
Subject:Re: Checkpointing under backpressure

Quick note: The current implementation is

Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part)

On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski <pi...@ververica.com> wrote:

> > Thanks for the great ideas so far.
>
> +1
>
> Regarding other things raised, I mostly agree with Stephan.
>
> I like the idea of simultaneously starting the checkpoint everywhere via
> RPC call (especially in cases where Tasks are busy doing some synchronous
> operations for example for tens of milliseconds. In that case every network
> exchange adds tens of milliseconds of delay in propagating the checkpoint).
> However I agree that this might be a premature optimisation assuming the
> current state of our code (we already have checkpoint barriers).
>
> However I like the idea of switching from:
>
> 1. A -> S -> F (Align -> snapshot -> forward markers)
>
> To
>
> 2. S -> F -> L (Snapshot -> forward markers -> log pending channels)
>
> Or even to
>
> 6. F -> S -> L (Forward markers -> snapshot -> log pending channels)
>
> It feels to me like this would decouple propagation of checkpoints from
> costs of synchronous snapshots and waiting for all of the checkpoint
> barriers to arrive (even if they will overtake in-flight records, this
> might take some time).
>
> > What I like about the Chandy Lamport approach (2.) initiated from
> sources is that:
> >       - Snapshotting imposes no modification to normal processing.
>
> Yes, I agree that would be nice. Currently, during the alignment and
> blocking of the input channels, we might be wasting CPU cycles of up stream
> tasks. If we succeed in designing new checkpointing mechanism to not
> disrupt/block regular data processing (% the extra IO cost for logging the
> in-flight records), that would be a huge improvement.
>
> Piotrek
>
> > On 14 Aug 2019, at 14:56, Paris Carbone <seniorcarb...@gmail.com> wrote:
> >
> > Sure I see. In cases when no periodic aligned snapshots are employed
> this is the only option.
> >
> > Two things that were not highlighted enough so far on the proposed
> protocol (included my mails):
> >       - The Recovery/Reconfiguration strategy should strictly prioritise
> processing logged events before entering normal task input operation.
> Otherwise causality can be violated. This also means dataflow recovery will
> be expected to be slower to the one employed on an aligned snapshot.
> >       - Same as with state capture, markers should be forwarded upon
> first marker received on input. No later than that. Otherwise we have
> duplicate side effects.
> >
> > Thanks for the great ideas so far.
> >
> > Paris
> >
> >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org> wrote:
> >>
> >> Scaling with unaligned checkpoints might be a necessity.
> >>
> >> Let's assume the job failed due to a lost TaskManager, but no new
> >> TaskManager becomes available.
> >> In that case we need to scale down based on the latest complete
> checkpoint,
> >> because we cannot produce a new checkpoint.
> >>
> >>
> >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone <seniorcarb...@gmail.com>
> >> wrote:
> >>
> >>> +1 I think we are on the same page Stephan.
> >>>
> >>> Rescaling on unaligned checkpoint sounds challenging and a bit
> >>> unnecessary. No?
> >>> Why not sticking to aligned snapshots for live
> reconfiguration/rescaling?
> >>> It’s a pretty rare operation and it would simplify things by a lot.
> >>> Everything can be “staged” upon alignment including replacing channels
> and
> >>> tasks.
> >>>
> >>> -Paris
> >>>
> >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote:
> >>>>
> >>>> Hi all!
> >>>>
> >>>> Yes, the first proposal of "unaligend checkpoints" (probably two years
> >>> back
> >>>> now) drew a major inspiration from Chandy Lamport, as did actually the
> >>>> original checkpointing algorithm.
> >>>>
> >>>> "Logging data between first and last barrier" versus "barrier jumping
> >>> over
> >>>> buffer and storing those buffers" is pretty close same.
> >>>> However, there are a few nice benefits of the proposal of unaligned
> >>>> checkpoints over Chandy-Lamport.
> >>>>
> >>>> *## Benefits of Unaligned Checkpoints*
> >>>>
> >>>> (1) It is very similar to the original algorithm (can be seen an an
> >>>> optional feature purely in the network stack) and thus can share
> lot's of
> >>>> code paths.
> >>>>
> >>>> (2) Less data stored. If we make the "jump over buffers" part timeout
> >>> based
> >>>> (for example barrier overtakes buffers if not flushed within 10ms)
> then
> >>>> checkpoints are in the common case of flowing pipelines aligned
> without
> >>>> in-flight data. Only back pressured cases store some in-flight data,
> >>> which
> >>>> means we don't regress in the common case and only fix the back
> pressure
> >>>> case.
> >>>>
> >>>> (3) Faster checkpoints. Chandy Lamport still waits for all barriers to
> >>>> arrive naturally, logging on the way. If data processing is slow, this
> >>> can
> >>>> still take quite a while.
> >>>>
> >>>> ==> I think both these points are strong reasons to not change the
> >>>> mechanism away from "trigger sources" and start with CL-style "trigger
> >>> all".
> >>>>
> >>>>
> >>>> *## Possible ways to combine Chandy Lamport and Unaligned Checkpoints*
> >>>>
> >>>> We can think about something like "take state snapshot on first
> barrier"
> >>>> and then store buffers until the other barriers arrive. Inside the
> >>> network
> >>>> stack, barriers could still overtake and persist buffers.
> >>>> The benefit would be less latency increase in the channels which
> already
> >>>> have received barriers.
> >>>> However, as mentioned before, not prioritizing the inputs from which
> >>>> barriers are still missing can also have an adverse effect.
> >>>>
> >>>>
> >>>> *## Concerning upgrades*
> >>>>
> >>>> I think it is a fair restriction to say that upgrades need to happen
> on
> >>>> aligned checkpoints. It is a rare enough operation.
> >>>>
> >>>>
> >>>> *## Concerning re-scaling (changing parallelism)*
> >>>>
> >>>> We need to support that on unaligned checkpoints as well. There are
> >>> several
> >>>> feature proposals about automatic scaling, especially down scaling in
> >>> case
> >>>> of missing resources. The last snapshot might be a regular
> checkpoint, so
> >>>> all checkpoints need to support rescaling.
> >>>>
> >>>>
> >>>> *## Concerning end-to-end checkpoint duration and "trigger sources"
> >>> versus
> >>>> "trigger all"*
> >>>>
> >>>> I think for the end-to-end checkpoint duration, an "overtake buffers"
> >>>> approach yields faster checkpoints, as mentioned above (Chandy Lamport
> >>>> logging still needs to wait for barrier to flow).
> >>>>
> >>>> I don't see the benefit of a "trigger all tasks via RPC concurrently"
> >>>> approach. Bear in mind that it is still a globally coordinated
> approach
> >>> and
> >>>> you need to wait for the global checkpoint to complete before
> committing
> >>>> any side effects.
> >>>> I believe that the checkpoint time is more determined by the state
> >>>> checkpoint writing, and the global coordination and metadata commit,
> than
> >>>> by the difference in alignment time between "trigger from source and
> jump
> >>>> over buffers" versus "trigger all tasks concurrently".
> >>>>
> >>>> Trying to optimize a few tens of milliseconds out of the network stack
> >>>> sends (and changing the overall checkpointing approach completely for
> >>> that)
> >>>> while staying with a globally coordinated checkpoint will send us
> down a
> >>>> path to a dead end.
> >>>>
> >>>> To really bring task persistence latency down to 10s of milliseconds
> (so
> >>> we
> >>>> can frequently commit in sinks), we need to take an approach without
> any
> >>>> global coordination. Tasks need to establish a persistent recovery
> point
> >>>> individually and at their own discretion, only then can it be frequent
> >>>> enough. To get there, they would need to decouple themselves from the
> >>>> predecessor and successor tasks (via something like persistent
> channels).
> >>>> This is a different discussion, though, somewhat orthogonal to this
> one
> >>>> here.
> >>>>
> >>>> Best,
> >>>> Stephan
> >>>>
> >>>>
> >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <pi...@ververica.com>
> >>> wrote:
> >>>>
> >>>>> Hi again,
> >>>>>
> >>>>> Zhu Zhu let me think about this more. Maybe as Paris is writing, we
> do
> >>> not
> >>>>> need to block any channels at all, at least assuming credit base flow
> >>>>> control. Regarding what should happen with the following checkpoint
> is
> >>>>> another question. Also, should we support concurrent checkpoints and
> >>>>> subsuming checkpoints as we do now? Maybe not…
> >>>>>
> >>>>> Paris
> >>>>>
> >>>>> Re
> >>>>> I. 2. a) and b) - yes, this would have to be taken into an account
> >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint time will
> >>>>> probably be longer than it could be. It might affect external
> systems.
> >>> For
> >>>>> example Kafka, which automatically time outs lingering transactions,
> and
> >>>>> for us, the transaction time is equal to the time between two
> >>> checkpoints.
> >>>>>
> >>>>> II 1. - I’m confused. To make things straight. Flink is currently
> >>>>> snapshotting once it receives all of the checkpoint barriers from
> all of
> >>>>> the input channels and only then it broadcasts the checkpoint barrier
> >>> down
> >>>>> the stream. And this is correct from exactly-once perspective.
> >>>>>
> >>>>> As far as I understand, your proposal based on Chandy Lamport
> algorithm,
> >>>>> is snapshotting the state of the operator on the first checkpoint
> >>> barrier,
> >>>>> which also looks correct to me.
> >>>>>
> >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more about
> this.
> >>>>>
> >>>>> V. Yes, we still need aligned checkpoints, as they are easier for
> state
> >>>>> migration and upgrades.
> >>>>>
> >>>>> Piotrek
> >>>>>
> >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone <seniorcarb...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>> Now I see a little more clearly what you have in mind. Thanks for
> the
> >>>>> explanation!
> >>>>>> There are a few intermixed concepts here, some how to do with
> >>>>> correctness some with performance.
> >>>>>> Before delving deeper I will just enumerate a few things to make
> myself
> >>>>> a little more helpful if I can.
> >>>>>>
> >>>>>> I. Initiation
> >>>>>> -------------
> >>>>>>
> >>>>>> 1. RPC to sources only is a less intrusive way to initiate snapshots
> >>>>> since you utilize better pipeline parallelism (only a small subset of
> >>> tasks
> >>>>> is running progressively the protocol at a time, if snapshotting is
> >>> async
> >>>>> the overall overhead might not even be observable).
> >>>>>>
> >>>>>> 2. If we really want an RPC to all initiation take notice of the
> >>>>> following implications:
> >>>>>>
> >>>>>>    a. (correctness) RPC calls are not guaranteed to arrive in every
> >>>>> task before a marker from a preceding task.
> >>>>>>
> >>>>>>    b. (correctness) Either the RPC call OR the first arriving marker
> >>>>> should initiate the algorithm. Whichever comes first. If you only do
> it
> >>> per
> >>>>> RPC call then you capture a "late" state that includes side effects
> of
> >>>>> already logged events.
> >>>>>>
> >>>>>>    c. (performance) Lots of IO will be invoked at the same time on
> >>>>> the backend store from all tasks. This might lead to high congestion
> in
> >>>>> async snapshots.
> >>>>>>
> >>>>>> II. Capturing State First
> >>>>>> -------------------------
> >>>>>>
> >>>>>> 1. (correctness) Capturing state at the last marker sounds
> incorrect to
> >>>>> me (state contains side effects of already logged events based on the
> >>>>> proposed scheme). This results into duplicate processing. No?
> >>>>>>
> >>>>>> III. Channel Blocking / "Alignment"
> >>>>>> -----------------------------------
> >>>>>>
> >>>>>> 1. (performance?) What is the added benefit? We dont want a
> "complete"
> >>>>> transactional snapshot, async snapshots are purely for
> failure-recovery.
> >>>>> Thus, I dont see why this needs to be imposed at the expense of
> >>>>> performance/throughput. With the proposed scheme the whole dataflow
> >>> anyway
> >>>>> enters snapshotting/logging mode so tasks more or less snapshot
> >>>>> concurrently.
> >>>>>>
> >>>>>> IV Marker Bypassing
> >>>>>> -------------------
> >>>>>>
> >>>>>> 1. (correctness) This leads to equivalent in-flight snapshots so
> with
> >>>>> some quick thinking  correct. I will try to model this later and get
> >>> back
> >>>>> to you in case I find something wrong.
> >>>>>>
> >>>>>> 2. (performance) It also sounds like a meaningful optimisation! I
> like
> >>>>> thinking of this as a push-based snapshot. i.e., the producing task
> >>> somehow
> >>>>> triggers forward a consumer/channel to capture its state. By example
> >>>>> consider T1 -> |marker t1| -> T2.
> >>>>>>
> >>>>>> V. Usage of "Async" Snapshots
> >>>>>> ---------------------
> >>>>>>
> >>>>>> 1. Do you see this as a full replacement of "full" aligned
> >>>>> snapshots/savepoints? In my view async shanpshots will be needed from
> >>> time
> >>>>> to time but not as frequently. Yet, it seems like a valid approach
> >>> solely
> >>>>> for failure-recovery on the same configuration. Here's why:
> >>>>>>
> >>>>>>    a. With original snapshotting there is a strong duality between
> >>>>>>    a stream input (offsets) and committed side effects (internal
> >>>>> states and external commits to transactional sinks). While in the
> async
> >>>>> version, there are uncommitted operations (inflight records). Thus,
> you
> >>>>> cannot use these snapshots for e.g., submitting sql queries with
> >>> snapshot
> >>>>> isolation. Also, the original snapshotting gives a lot of potential
> for
> >>>>> flink to make proper transactional commits externally.
> >>>>>>
> >>>>>>    b. Reconfiguration is very tricky, you probably know that better.
> >>>>> Inflight channel state is no longer valid in a new configuration
> (i.e.,
> >>> new
> >>>>> dataflow graph, new operators, updated operator logic, different
> >>> channels,
> >>>>> different parallelism)
> >>>>>>
> >>>>>> 2. Async snapshots can also be potentially useful for monitoring the
> >>>>> general health of a dataflow since they can be analyzed by the task
> >>> manager
> >>>>> about the general performance of a job graph and spot bottlenecks for
> >>>>> example.
> >>>>>>
> >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com>
> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Thomas:
> >>>>>>> There are no Jira tickets yet (or maybe there is something very old
> >>>>> somewhere). First we want to discuss it, next present FLIP and at
> last
> >>>>> create tickets :)
> >>>>>>>
> >>>>>>>> if I understand correctly, then the proposal is to not block any
> >>>>>>>> input channel at all, but only log data from the backpressured
> >>> channel
> >>>>> (and
> >>>>>>>> make it part of the snapshot) until the barrier arrives
> >>>>>>>
> >>>>>>> I would guess that it would be better to block the reads, unless we
> >>> can
> >>>>> already process the records from the blocked channel…
> >>>>>>>
> >>>>>>> Paris:
> >>>>>>>
> >>>>>>> Thanks for the explanation Paris. I’m starting to understand this
> more
> >>>>> and I like the idea of snapshotting the state of an operator before
> >>>>> receiving all of the checkpoint barriers - this would allow more
> things
> >>> to
> >>>>> happen at the same time instead of sequentially. As Zhijiang has
> pointed
> >>>>> out there are some things not considered in your proposal: overtaking
> >>>>> output buffers, but maybe those things could be incorporated
> together.
> >>>>>>>
> >>>>>>> Another thing is that from the wiki description I understood that
> the
> >>>>> initial checkpointing is not initialised by any checkpoint barrier,
> but
> >>> by
> >>>>> an independent call/message from the Observer. I haven’t played with
> >>> this
> >>>>> idea a lot, but I had some discussion with Nico and it seems that it
> >>> might
> >>>>> work:
> >>>>>>>
> >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all tasks
> >>>>>>> 2. Task (with two input channels l1 and l2) upon receiving RPC from
> >>> 1.,
> >>>>> takes a snapshot of it's state and:
> >>>>>>> a) broadcast checkpoint barrier down the stream to all channels
> (let’s
> >>>>> ignore for a moment potential for this barrier to overtake the buffer
> >>>>> output data)
> >>>>>>> b) for any input channel for which it hasn’t yet received
> checkpoint
> >>>>> barrier, the data are being added to the checkpoint
> >>>>>>> c) once a channel (for example l1) receives a checkpoint barrier,
> the
> >>>>> Task blocks reads from that channel (?)
> >>>>>>> d) after all remaining channels (l2) receive checkpoint barriers,
> the
> >>>>> Task  first has to process the buffered data after that it can
> unblock
> >>> the
> >>>>> reads from the channels
> >>>>>>>
> >>>>>>> Checkpoint barriers do not cascade/flow through different tasks
> here.
> >>>>> Checkpoint barrier emitted from Task1, reaches only the immediate
> >>>>> downstream Tasks. Thanks to this setup, total checkpointing time is
> not
> >>> sum
> >>>>> of checkpointing times of all Tasks one by one, but more or less max
> of
> >>> the
> >>>>> slowest Tasks. Right?
> >>>>>>>
> >>>>>>> Couple of intriguing thoughts are:
> >>>>>>> 3. checkpoint barriers overtaking the output buffers
> >>>>>>> 4. can we keep processing some data (in order to not waste CPU
> cycles)
> >>>>> after we have taking the snapshot of the Task. I think we could.
> >>>>>>>
> >>>>>>> Piotrek
> >>>>>>>
> >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> wrote:
> >>>>>>>>
> >>>>>>>> Great discussion! I'm excited that this is already under
> >>>>> consideration! Are
> >>>>>>>> there any JIRAs or other traces of discussion to follow?
> >>>>>>>>
> >>>>>>>> Paris, if I understand correctly, then the proposal is to not
> block
> >>> any
> >>>>>>>> input channel at all, but only log data from the backpressured
> >>> channel
> >>>>> (and
> >>>>>>>> make it part of the snapshot) until the barrier arrives? This is
> >>>>>>>> intriguing. But probably there is also a benefit of to not
> continue
> >>>>> reading
> >>>>>>>> I1 since that could speed up retrieval from I2. Also, if the user
> >>> code
> >>>>> is
> >>>>>>>> the cause of backpressure, this would avoid pumping more data into
> >>> the
> >>>>>>>> process function.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Thomas
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <
> wangzhijiang...@aliyun.com
> >>>>> .invalid>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Paris,
> >>>>>>>>>
> >>>>>>>>> Thanks for the detailed sharing. And I think it is very similar
> with
> >>>>> the
> >>>>>>>>> way of overtaking we proposed before.
> >>>>>>>>>
> >>>>>>>>> There are some tiny difference:
> >>>>>>>>> The way of overtaking might need to snapshot all the input/output
> >>>>> queues.
> >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input channels
> >>> after
> >>>>> the
> >>>>>>>>> first barrier arrives, which might reduce the state sizea bit.
> But
> >>>>> normally
> >>>>>>>>> there should be less buffers for the first input channel with
> >>> barrier.
> >>>>>>>>> The output barrier still follows with regular data stream in
> Chandy
> >>>>>>>>> Lamport, the same way as current flink. For overtaking way, we
> need
> >>>>> to pay
> >>>>>>>>> extra efforts to make barrier transport firstly before outque
> queue
> >>> on
> >>>>>>>>> upstream side, and change the way of barrier alignment based on
> >>>>> receiving
> >>>>>>>>> instead of current reading on downstream side.
> >>>>>>>>> In the backpressure caused by data skew, the first barrier in
> almost
> >>>>> empty
> >>>>>>>>> input channel should arrive much eariler than the last heavy load
> >>>>> input
> >>>>>>>>> channel, so the Chandy Lamport could benefit well. But for the
> case
> >>>>> of all
> >>>>>>>>> balanced heavy load input channels, I mean the first arrived
> barrier
> >>>>> might
> >>>>>>>>> still take much time, then the overtaking way could still fit
> well
> >>> to
> >>>>> speed
> >>>>>>>>> up checkpoint.
> >>>>>>>>> Anyway, your proposed suggestion is helpful on my side,
> especially
> >>>>>>>>> considering some implementation details .
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Zhijiang
> >>>>>>>>>
> ------------------------------------------------------------------
> >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com>
> >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03
> >>>>>>>>> To:dev <dev@flink.apache.org>
> >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com>
> >>>>>>>>> Subject:Re: Checkpointing under backpressure
> >>>>>>>>>
> >>>>>>>>> yes! It’s quite similar I think.  Though mind that the devil is
> in
> >>> the
> >>>>>>>>> details, i.e., the temporal order actions are taken.
> >>>>>>>>>
> >>>>>>>>> To clarify, let us say you have a task T with two input channels
> I1
> >>>>> and I2.
> >>>>>>>>> The Chandy Lamport execution flow is the following:
> >>>>>>>>>
> >>>>>>>>> 1) T receives barrier from  I1 and...
> >>>>>>>>> 2)  ...the following three actions happen atomically
> >>>>>>>>> I )  T snapshots its state T*
> >>>>>>>>> II)  T forwards marker to its outputs
> >>>>>>>>> III) T starts logging all events of I2 (only) into a buffer M*
> >>>>>>>>> - Also notice here that T does NOT block I1 as it does in aligned
> >>>>>>>>> snapshots -
> >>>>>>>>> 3) Eventually T receives barrier from I2 and stops recording
> events.
> >>>>> Its
> >>>>>>>>> asynchronously captured snapshot is now complete: {T*,M*}.
> >>>>>>>>> Upon recovery all messages of M* should be replayed in FIFO
> order.
> >>>>>>>>>
> >>>>>>>>> With this approach alignment does not create a deadlock situation
> >>>>> since
> >>>>>>>>> anyway 2.II happens asynchronously and messages can be logged as
> >>> well
> >>>>>>>>> asynchronously during the process of the snapshot. If there is
> >>>>>>>>> back-pressure in a pipeline the cause is most probably not this
> >>>>> algorithm.
> >>>>>>>>>
> >>>>>>>>> Back to your observation, the answer : yes and no.  In your
> network
> >>>>> model,
> >>>>>>>>> I can see the logic of “logging” and “committing” a final
> snapshot
> >>>>> being
> >>>>>>>>> provided by the channel implementation. However, do mind that the
> >>>>> first
> >>>>>>>>> barrier always needs to go “all the way” to initiate the Chandy
> >>>>> Lamport
> >>>>>>>>> algorithm logic.
> >>>>>>>>>
> >>>>>>>>> The above flow has been proven using temporal logic in my phd
> thesis
> >>>>> in
> >>>>>>>>> case you are interested about the proof.
> >>>>>>>>> I hope this helps a little clarifying things. Let me know if
> there
> >>> is
> >>>>> any
> >>>>>>>>> confusing point to disambiguate. I would be more than happy to
> help
> >>>>> if I
> >>>>>>>>> can.
> >>>>>>>>>
> >>>>>>>>> Paris
> >>>>>>>>>
> >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <pi...@ververica.com>
> >>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots
> don’t
> >>>>> you
> >>>>>>>>> still have to wait for the “checkpoint barrier” to arrive in
> order
> >>> to
> >>>>> know
> >>>>>>>>> when have you already received all possible messages from the
> >>> upstream
> >>>>>>>>> tasks/operators? So instead of processing the “in flight”
> messages
> >>>>> (as the
> >>>>>>>>> Flink is doing currently), you are sending them to an “observer”?
> >>>>>>>>>>
> >>>>>>>>>> In that case, that’s sounds similar to “checkpoint barriers
> >>>>> overtaking
> >>>>>>>>> in flight records” (aka unaligned checkpoints). Just for us, the
> >>>>> observer
> >>>>>>>>> is a snapshot state.
> >>>>>>>>>>
> >>>>>>>>>> Piotrek
> >>>>>>>>>>
> >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone <
> seniorcarb...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Interesting problem! Thanks for bringing it up Thomas.
> >>>>>>>>>>>
> >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport
> >>>>> snapshots
> >>>>>>>>> [1] would help out solve this problem more elegantly without
> >>>>> sacrificing
> >>>>>>>>> correctness.
> >>>>>>>>>>> - They do not need alignment, only (async) logging for
> in-flight
> >>>>>>>>> records between the time the first barrier is processed until the
> >>> last
> >>>>>>>>> barrier arrives in a task.
> >>>>>>>>>>> - They work fine for failure recovery as long as logged records
> >>> are
> >>>>>>>>> replayed on startup.
> >>>>>>>>>>>
> >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still necessary
> >>> for
> >>>>>>>>> transactional sink commits + any sort of reconfiguration (e.g.,
> >>>>> rescaling,
> >>>>>>>>> updating the logic of operators to evolve an application etc.).
> >>>>>>>>>>>
> >>>>>>>>>>> I don’t completely understand the “overtaking” approach but if
> you
> >>>>> have
> >>>>>>>>> a concrete definition I would be happy to check it out and help
> if I
> >>>>> can!
> >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by logging
> things
> >>> in
> >>>>>>>>> pending channels in a task snapshot before the barrier arrives.
> >>>>>>>>>>>
> >>>>>>>>>>> -Paris
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
> >>>>> <
> >>>>>>>>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm>
> >>>>>>>>>>>
> >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com
> >
> >>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>
> >>>>>>>>>>>> As Zhijiang has responded, we are now in the process of
> >>> discussing
> >>>>> how
> >>>>>>>>> to address this issue and one of the solution that we are
> discussing
> >>>>> is
> >>>>>>>>> exactly what you are proposing: checkpoint barriers overtaking
> the
> >>> in
> >>>>>>>>> flight data and make the in flight data part of the checkpoint.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If everything works well, we will be able to present result of
> >>> our
> >>>>>>>>> discussions on the dev mailing list soon.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang <
> wangzhijiang...@aliyun.com
> >>>>> .INVALID>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for proposing this concern. The barrier alignment
> takes
> >>>>> long
> >>>>>>>>> time in backpressure case which could cause several problems:
> >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned.
> >>>>>>>>>>>>> 2. The recovery cost is high once failover, because much data
> >>>>> needs
> >>>>>>>>> to be replayed.
> >>>>>>>>>>>>> 3. The delay for commit-based sink is high in exactly-once.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For credit-based flow control from release-1.5, the amount of
> >>>>>>>>> in-flighting buffers before barrier alignment is reduced, so we
> >>> could
> >>>>> get a
> >>>>>>>>> bit
> >>>>>>>>>>>>> benefits from speeding checkpoint aspect.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the channels which
> >>>>> already
> >>>>>>>>> received the barrier in practice. But actually we ever did the
> >>>>> similar thing
> >>>>>>>>>>>>> to speed barrier alighment before. I am not quite sure that
> >>>>>>>>> release-1.8 covers this feature. There were some relevant
> >>> discussions
> >>>>> under
> >>>>>>>>> jira [1].
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For release-1.10, the community is now discussing the
> feature of
> >>>>>>>>> unaligned checkpoint which is mainly for resolving above
> concerns.
> >>> The
> >>>>>>>>> basic idea
> >>>>>>>>>>>>> is to make barrier overtakes the output/input buffer queue to
> >>>>> speed
> >>>>>>>>> alignment, and snapshot the input/output buffers as part of
> >>> checkpoint
> >>>>>>>>> state. The
> >>>>>>>>>>>>> details have not confirmed yet and is still under discussion.
> >>>>> Wish we
> >>>>>>>>> could make some improvments for the release-1.10.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Zhijiang
> >>>>>>>>>>>>>
> >>> ------------------------------------------------------------------
> >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org>
> >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38
> >>>>>>>>>>>>> To:dev <dev@flink.apache.org>
> >>>>>>>>>>>>> Subject:Checkpointing under backpressure
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> One of the major operational difficulties we observe with
> Flink
> >>>>> are
> >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for both
> >>>>>>>>> confirmation
> >>>>>>>>>>>>> of my understanding of the current behavior as well as
> pointers
> >>>>> for
> >>>>>>>>> future
> >>>>>>>>>>>>> improvement work:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Prior to introduction of credit based flow control in the
> >>> network
> >>>>>>>>> stack [1]
> >>>>>>>>>>>>> [2], checkpoint barriers would back up with the data for all
> >>>>> logical
> >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the
> buffers
> >>> are
> >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are only held
> >>>>> back for
> >>>>>>>>>>>>> channels that have backpressure, while others can continue
> >>>>> processing
> >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot "overtake
> >>>>> data",
> >>>>>>>>>>>>> therefore checkpoint alignment remains affected for the
> channel
> >>>>> with
> >>>>>>>>>>>>> backpressure, with the potential for slow checkpointing and
> >>>>> timeouts.
> >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the maximum
> >>>>> in-transit
> >>>>>>>>>>>>> buffers per channel, resulting in an improvement compared to
> >>>>> previous
> >>>>>>>>>>>>> versions of Flink. Also, the backpressure based checkpoint
> >>>>> alignment
> >>>>>>>>> can
> >>>>>>>>>>>>> help the barrier advance faster on the receiver side (by
> >>>>> suspending
> >>>>>>>>>>>>> channels that have already delivered the barrier). Is that
> >>>>> accurate
> >>>>>>>>> as of
> >>>>>>>>>>>>> Flink 1.8?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What appears to be missing to completely unblock
> checkpointing
> >>> is
> >>>>> a
> >>>>>>>>>>>>> mechanism for checkpoints to overtake the data. That would
> help
> >>> in
> >>>>>>>>>>>>> situations where the processing itself is the bottleneck and
> >>>>>>>>> prioritization
> >>>>>>>>>>>>> in the network stack alone cannot address the barrier delay.
> Was
> >>>>>>>>> there any
> >>>>>>>>>>>>> related discussion? One possible solution would be to drain
> >>>>> incoming
> >>>>>>>>> data
> >>>>>>>>>>>>> till the barrier and make it part of the checkpoint instead
> of
> >>>>>>>>> processing
> >>>>>>>>>>>>> it. This is somewhat related to asynchronous processing, but
> I'm
> >>>>>>>>> thinking
> >>>>>>>>>>>>> more of a solution that is automated in the Flink runtime for
> >>> the
> >>>>>>>>>>>>> backpressure scenario only.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>> https://flink.apache.org/2019/06/05/flink-network-stack.html
> >>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>
> >>>
> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >
>
>

Reply via email to