@Thomas just to double check: - parallelism and configuration changes should be well possible on unaligned checkpoints - changes in state types and JobGraph structure would be tricky, and changing the on-the-wire types would not be possible.
On Wed, Aug 14, 2019 at 7:48 PM Thomas Weise <t...@apache.org> wrote: > --> > > On Wed, Aug 14, 2019 at 10:23 AM zhijiang > <wangzhijiang...@aliyun.com.invalid> wrote: > > > Thanks for these great points and disccusions! > > > > 1. Considering the way of triggering checkpoint RPC calls to all the > tasks > > from Chandy Lamport, it combines two different mechanisms together to > make > > sure that the trigger could be fast in different scenarios. > > But in flink world it might be not very worth trying that way, just as > > Stephan's analysis for it. Another concern is that it might bring more > > heavy loads for JobMaster broadcasting this checkpoint RPC to all the > tasks > > in large scale job, especially for the very short checkpoint interval. > > Furthermore it would also cause other important RPC to be executed delay > to > > bring potentail timeout risks. > > > > 2. I agree with the idea of drawing on the way "take state snapshot on > > first barrier" from Chandy Lamport instead of barrier alignment combining > > with unaligned checkpoints in flink. > > > > > >>>> The benefit would be less latency increase in the channels which > > already have received barriers. > > > >>>> However, as mentioned before, not prioritizing the inputs from > > which barriers are still missing can also have an adverse effect. > > > > I think we will not have an adverse effect if not prioritizing the inputs > > w/o barriers in this case. After sync snapshot, the task could actually > > process any input channels. For the input channel receiving the first > > barrier, we already have the obvious boundary for persisting buffers. For > > other channels w/o barriers we could persist the following buffers for > > these channels until barrier arrives in network. Because based on the > > credit based flow control, the barrier does not need credit to transport, > > then as long as the sender overtakes the barrier accross the output > queue, > > the network stack would transport this barrier immediately no matter with > > the inputs condition on receiver side. So there is no requirements to > > consume accumulated buffers in these channels for higher priority. If so > it > > seems that we will not waste any CPU cycles as Piotr concerns before. > > > > I'm not sure I follow this. For the checkpoint to complete, any buffer that > arrived prior to the barrier would be to be part of the checkpointed state. > So wouldn't it be important to finish persisting these buffers as fast as > possible by prioritizing respective inputs? The task won't be able to > process records from the inputs that have seen the barrier fast when it is > already backpressured (or causing the backpressure). > > > > > > 3. Suppose the unaligned checkpoints performing well in practice, is it > > possible to make it as the only mechanism for handling all the cases? I > > mean for the non-backpressure scenario, there are less buffers even empty > > in input/output queue, then the "overtaking barrier--> trigger snapshot > on > > first barrier--> persist buffers" might still work well. So we do not > need > > to maintain two suits of mechanisms finally. > > > > 4. The initial motivation of this dicussion is for checkpoint timeout in > > backpressure scenario. If we adjust the default timeout to a very big > > value, that means the checkpoint would never timeout and we only need to > > wait it finish. Then are there still any other problems/concerns if > > checkpoint takes long time to finish? Althougn we already knew some > issues > > before, it is better to gather more user feedbacks to confirm which > aspects > > could be solved in this feature design. E.g. the sink commit delay might > > not be coverd by unaligned solution. > > > > Checkpoints taking too long is the concern that sparks this discussion > (timeout is just a symptom). The slowness issue also applies to the > savepoint use case. We would need to be able to take a savepoint fast in > order to roll forward a fix that can alleviate the backpressure (like > changing parallelism or making a different configuration change). > > > > > > Best, > > Zhijiang > > ------------------------------------------------------------------ > > From:Stephan Ewen <se...@apache.org> > > Send Time:2019年8月14日(星期三) 17:43 > > To:dev <dev@flink.apache.org> > > Subject:Re: Checkpointing under backpressure > > > > Quick note: The current implementation is > > > > Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part) > > > > On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski <pi...@ververica.com> > > wrote: > > > > > > Thanks for the great ideas so far. > > > > > > +1 > > > > > > Regarding other things raised, I mostly agree with Stephan. > > > > > > I like the idea of simultaneously starting the checkpoint everywhere > via > > > RPC call (especially in cases where Tasks are busy doing some > synchronous > > > operations for example for tens of milliseconds. In that case every > > network > > > exchange adds tens of milliseconds of delay in propagating the > > checkpoint). > > > However I agree that this might be a premature optimisation assuming > the > > > current state of our code (we already have checkpoint barriers). > > > > > > However I like the idea of switching from: > > > > > > 1. A -> S -> F (Align -> snapshot -> forward markers) > > > > > > To > > > > > > 2. S -> F -> L (Snapshot -> forward markers -> log pending channels) > > > > > > Or even to > > > > > > 6. F -> S -> L (Forward markers -> snapshot -> log pending channels) > > > > > > It feels to me like this would decouple propagation of checkpoints from > > > costs of synchronous snapshots and waiting for all of the checkpoint > > > barriers to arrive (even if they will overtake in-flight records, this > > > might take some time). > > > > > > > What I like about the Chandy Lamport approach (2.) initiated from > > > sources is that: > > > > - Snapshotting imposes no modification to normal processing. > > > > > > Yes, I agree that would be nice. Currently, during the alignment and > > > blocking of the input channels, we might be wasting CPU cycles of up > > stream > > > tasks. If we succeed in designing new checkpointing mechanism to not > > > disrupt/block regular data processing (% the extra IO cost for logging > > the > > > in-flight records), that would be a huge improvement. > > > > > > Piotrek > > > > > > > On 14 Aug 2019, at 14:56, Paris Carbone <seniorcarb...@gmail.com> > > wrote: > > > > > > > > Sure I see. In cases when no periodic aligned snapshots are employed > > > this is the only option. > > > > > > > > Two things that were not highlighted enough so far on the proposed > > > protocol (included my mails): > > > > - The Recovery/Reconfiguration strategy should strictly > > prioritise > > > processing logged events before entering normal task input operation. > > > Otherwise causality can be violated. This also means dataflow recovery > > will > > > be expected to be slower to the one employed on an aligned snapshot. > > > > - Same as with state capture, markers should be forwarded upon > > > first marker received on input. No later than that. Otherwise we have > > > duplicate side effects. > > > > > > > > Thanks for the great ideas so far. > > > > > > > > Paris > > > > > > > >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org> wrote: > > > >> > > > >> Scaling with unaligned checkpoints might be a necessity. > > > >> > > > >> Let's assume the job failed due to a lost TaskManager, but no new > > > >> TaskManager becomes available. > > > >> In that case we need to scale down based on the latest complete > > > checkpoint, > > > >> because we cannot produce a new checkpoint. > > > >> > > > >> > > > >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone < > > seniorcarb...@gmail.com> > > > >> wrote: > > > >> > > > >>> +1 I think we are on the same page Stephan. > > > >>> > > > >>> Rescaling on unaligned checkpoint sounds challenging and a bit > > > >>> unnecessary. No? > > > >>> Why not sticking to aligned snapshots for live > > > reconfiguration/rescaling? > > > >>> It’s a pretty rare operation and it would simplify things by a lot. > > > >>> Everything can be “staged” upon alignment including replacing > > channels > > > and > > > >>> tasks. > > > >>> > > > >>> -Paris > > > >>> > > > >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote: > > > >>>> > > > >>>> Hi all! > > > >>>> > > > >>>> Yes, the first proposal of "unaligend checkpoints" (probably two > > years > > > >>> back > > > >>>> now) drew a major inspiration from Chandy Lamport, as did actually > > the > > > >>>> original checkpointing algorithm. > > > >>>> > > > >>>> "Logging data between first and last barrier" versus "barrier > > jumping > > > >>> over > > > >>>> buffer and storing those buffers" is pretty close same. > > > >>>> However, there are a few nice benefits of the proposal of > unaligned > > > >>>> checkpoints over Chandy-Lamport. > > > >>>> > > > >>>> *## Benefits of Unaligned Checkpoints* > > > >>>> > > > >>>> (1) It is very similar to the original algorithm (can be seen an > an > > > >>>> optional feature purely in the network stack) and thus can share > > > lot's of > > > >>>> code paths. > > > >>>> > > > >>>> (2) Less data stored. If we make the "jump over buffers" part > > timeout > > > >>> based > > > >>>> (for example barrier overtakes buffers if not flushed within 10ms) > > > then > > > >>>> checkpoints are in the common case of flowing pipelines aligned > > > without > > > >>>> in-flight data. Only back pressured cases store some in-flight > data, > > > >>> which > > > >>>> means we don't regress in the common case and only fix the back > > > pressure > > > >>>> case. > > > >>>> > > > >>>> (3) Faster checkpoints. Chandy Lamport still waits for all > barriers > > to > > > >>>> arrive naturally, logging on the way. If data processing is slow, > > this > > > >>> can > > > >>>> still take quite a while. > > > >>>> > > > >>>> ==> I think both these points are strong reasons to not change the > > > >>>> mechanism away from "trigger sources" and start with CL-style > > "trigger > > > >>> all". > > > >>>> > > > >>>> > > > >>>> *## Possible ways to combine Chandy Lamport and Unaligned > > Checkpoints* > > > >>>> > > > >>>> We can think about something like "take state snapshot on first > > > barrier" > > > >>>> and then store buffers until the other barriers arrive. Inside the > > > >>> network > > > >>>> stack, barriers could still overtake and persist buffers. > > > >>>> The benefit would be less latency increase in the channels which > > > already > > > >>>> have received barriers. > > > >>>> However, as mentioned before, not prioritizing the inputs from > which > > > >>>> barriers are still missing can also have an adverse effect. > > > >>>> > > > >>>> > > > >>>> *## Concerning upgrades* > > > >>>> > > > >>>> I think it is a fair restriction to say that upgrades need to > happen > > > on > > > >>>> aligned checkpoints. It is a rare enough operation. > > > >>>> > > > >>>> > > > >>>> *## Concerning re-scaling (changing parallelism)* > > > >>>> > > > >>>> We need to support that on unaligned checkpoints as well. There > are > > > >>> several > > > >>>> feature proposals about automatic scaling, especially down scaling > > in > > > >>> case > > > >>>> of missing resources. The last snapshot might be a regular > > > checkpoint, so > > > >>>> all checkpoints need to support rescaling. > > > >>>> > > > >>>> > > > >>>> *## Concerning end-to-end checkpoint duration and "trigger > sources" > > > >>> versus > > > >>>> "trigger all"* > > > >>>> > > > >>>> I think for the end-to-end checkpoint duration, an "overtake > > buffers" > > > >>>> approach yields faster checkpoints, as mentioned above (Chandy > > Lamport > > > >>>> logging still needs to wait for barrier to flow). > > > >>>> > > > >>>> I don't see the benefit of a "trigger all tasks via RPC > > concurrently" > > > >>>> approach. Bear in mind that it is still a globally coordinated > > > approach > > > >>> and > > > >>>> you need to wait for the global checkpoint to complete before > > > committing > > > >>>> any side effects. > > > >>>> I believe that the checkpoint time is more determined by the state > > > >>>> checkpoint writing, and the global coordination and metadata > commit, > > > than > > > >>>> by the difference in alignment time between "trigger from source > and > > > jump > > > >>>> over buffers" versus "trigger all tasks concurrently". > > > >>>> > > > >>>> Trying to optimize a few tens of milliseconds out of the network > > stack > > > >>>> sends (and changing the overall checkpointing approach completely > > for > > > >>> that) > > > >>>> while staying with a globally coordinated checkpoint will send us > > > down a > > > >>>> path to a dead end. > > > >>>> > > > >>>> To really bring task persistence latency down to 10s of > milliseconds > > > (so > > > >>> we > > > >>>> can frequently commit in sinks), we need to take an approach > without > > > any > > > >>>> global coordination. Tasks need to establish a persistent recovery > > > point > > > >>>> individually and at their own discretion, only then can it be > > frequent > > > >>>> enough. To get there, they would need to decouple themselves from > > the > > > >>>> predecessor and successor tasks (via something like persistent > > > channels). > > > >>>> This is a different discussion, though, somewhat orthogonal to > this > > > one > > > >>>> here. > > > >>>> > > > >>>> Best, > > > >>>> Stephan > > > >>>> > > > >>>> > > > >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski < > > pi...@ververica.com> > > > >>> wrote: > > > >>>> > > > >>>>> Hi again, > > > >>>>> > > > >>>>> Zhu Zhu let me think about this more. Maybe as Paris is writing, > we > > > do > > > >>> not > > > >>>>> need to block any channels at all, at least assuming credit base > > flow > > > >>>>> control. Regarding what should happen with the following > checkpoint > > > is > > > >>>>> another question. Also, should we support concurrent checkpoints > > and > > > >>>>> subsuming checkpoints as we do now? Maybe not… > > > >>>>> > > > >>>>> Paris > > > >>>>> > > > >>>>> Re > > > >>>>> I. 2. a) and b) - yes, this would have to be taken into an > account > > > >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint time > > will > > > >>>>> probably be longer than it could be. It might affect external > > > systems. > > > >>> For > > > >>>>> example Kafka, which automatically time outs lingering > > transactions, > > > and > > > >>>>> for us, the transaction time is equal to the time between two > > > >>> checkpoints. > > > >>>>> > > > >>>>> II 1. - I’m confused. To make things straight. Flink is currently > > > >>>>> snapshotting once it receives all of the checkpoint barriers from > > > all of > > > >>>>> the input channels and only then it broadcasts the checkpoint > > barrier > > > >>> down > > > >>>>> the stream. And this is correct from exactly-once perspective. > > > >>>>> > > > >>>>> As far as I understand, your proposal based on Chandy Lamport > > > algorithm, > > > >>>>> is snapshotting the state of the operator on the first checkpoint > > > >>> barrier, > > > >>>>> which also looks correct to me. > > > >>>>> > > > >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more about > > > this. > > > >>>>> > > > >>>>> V. Yes, we still need aligned checkpoints, as they are easier for > > > state > > > >>>>> migration and upgrades. > > > >>>>> > > > >>>>> Piotrek > > > >>>>> > > > >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone < > seniorcarb...@gmail.com> > > > >>> wrote: > > > >>>>>> > > > >>>>>> Now I see a little more clearly what you have in mind. Thanks > for > > > the > > > >>>>> explanation! > > > >>>>>> There are a few intermixed concepts here, some how to do with > > > >>>>> correctness some with performance. > > > >>>>>> Before delving deeper I will just enumerate a few things to make > > > myself > > > >>>>> a little more helpful if I can. > > > >>>>>> > > > >>>>>> I. Initiation > > > >>>>>> ------------- > > > >>>>>> > > > >>>>>> 1. RPC to sources only is a less intrusive way to initiate > > snapshots > > > >>>>> since you utilize better pipeline parallelism (only a small > subset > > of > > > >>> tasks > > > >>>>> is running progressively the protocol at a time, if snapshotting > is > > > >>> async > > > >>>>> the overall overhead might not even be observable). > > > >>>>>> > > > >>>>>> 2. If we really want an RPC to all initiation take notice of the > > > >>>>> following implications: > > > >>>>>> > > > >>>>>> a. (correctness) RPC calls are not guaranteed to arrive in > > every > > > >>>>> task before a marker from a preceding task. > > > >>>>>> > > > >>>>>> b. (correctness) Either the RPC call OR the first arriving > > marker > > > >>>>> should initiate the algorithm. Whichever comes first. If you only > > do > > > it > > > >>> per > > > >>>>> RPC call then you capture a "late" state that includes side > effects > > > of > > > >>>>> already logged events. > > > >>>>>> > > > >>>>>> c. (performance) Lots of IO will be invoked at the same time > on > > > >>>>> the backend store from all tasks. This might lead to high > > congestion > > > in > > > >>>>> async snapshots. > > > >>>>>> > > > >>>>>> II. Capturing State First > > > >>>>>> ------------------------- > > > >>>>>> > > > >>>>>> 1. (correctness) Capturing state at the last marker sounds > > > incorrect to > > > >>>>> me (state contains side effects of already logged events based on > > the > > > >>>>> proposed scheme). This results into duplicate processing. No? > > > >>>>>> > > > >>>>>> III. Channel Blocking / "Alignment" > > > >>>>>> ----------------------------------- > > > >>>>>> > > > >>>>>> 1. (performance?) What is the added benefit? We dont want a > > > "complete" > > > >>>>> transactional snapshot, async snapshots are purely for > > > failure-recovery. > > > >>>>> Thus, I dont see why this needs to be imposed at the expense of > > > >>>>> performance/throughput. With the proposed scheme the whole > dataflow > > > >>> anyway > > > >>>>> enters snapshotting/logging mode so tasks more or less snapshot > > > >>>>> concurrently. > > > >>>>>> > > > >>>>>> IV Marker Bypassing > > > >>>>>> ------------------- > > > >>>>>> > > > >>>>>> 1. (correctness) This leads to equivalent in-flight snapshots so > > > with > > > >>>>> some quick thinking correct. I will try to model this later and > > get > > > >>> back > > > >>>>> to you in case I find something wrong. > > > >>>>>> > > > >>>>>> 2. (performance) It also sounds like a meaningful optimisation! > I > > > like > > > >>>>> thinking of this as a push-based snapshot. i.e., the producing > task > > > >>> somehow > > > >>>>> triggers forward a consumer/channel to capture its state. By > > example > > > >>>>> consider T1 -> |marker t1| -> T2. > > > >>>>>> > > > >>>>>> V. Usage of "Async" Snapshots > > > >>>>>> --------------------- > > > >>>>>> > > > >>>>>> 1. Do you see this as a full replacement of "full" aligned > > > >>>>> snapshots/savepoints? In my view async shanpshots will be needed > > from > > > >>> time > > > >>>>> to time but not as frequently. Yet, it seems like a valid > approach > > > >>> solely > > > >>>>> for failure-recovery on the same configuration. Here's why: > > > >>>>>> > > > >>>>>> a. With original snapshotting there is a strong duality > between > > > >>>>>> a stream input (offsets) and committed side effects (internal > > > >>>>> states and external commits to transactional sinks). While in the > > > async > > > >>>>> version, there are uncommitted operations (inflight records). > Thus, > > > you > > > >>>>> cannot use these snapshots for e.g., submitting sql queries with > > > >>> snapshot > > > >>>>> isolation. Also, the original snapshotting gives a lot of > potential > > > for > > > >>>>> flink to make proper transactional commits externally. > > > >>>>>> > > > >>>>>> b. Reconfiguration is very tricky, you probably know that > > better. > > > >>>>> Inflight channel state is no longer valid in a new configuration > > > (i.e., > > > >>> new > > > >>>>> dataflow graph, new operators, updated operator logic, different > > > >>> channels, > > > >>>>> different parallelism) > > > >>>>>> > > > >>>>>> 2. Async snapshots can also be potentially useful for monitoring > > the > > > >>>>> general health of a dataflow since they can be analyzed by the > task > > > >>> manager > > > >>>>> about the general performance of a job graph and spot bottlenecks > > for > > > >>>>> example. > > > >>>>>> > > > >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com> > > > wrote: > > > >>>>>>> > > > >>>>>>> Hi, > > > >>>>>>> > > > >>>>>>> Thomas: > > > >>>>>>> There are no Jira tickets yet (or maybe there is something very > > old > > > >>>>> somewhere). First we want to discuss it, next present FLIP and at > > > last > > > >>>>> create tickets :) > > > >>>>>>> > > > >>>>>>>> if I understand correctly, then the proposal is to not block > any > > > >>>>>>>> input channel at all, but only log data from the backpressured > > > >>> channel > > > >>>>> (and > > > >>>>>>>> make it part of the snapshot) until the barrier arrives > > > >>>>>>> > > > >>>>>>> I would guess that it would be better to block the reads, > unless > > we > > > >>> can > > > >>>>> already process the records from the blocked channel… > > > >>>>>>> > > > >>>>>>> Paris: > > > >>>>>>> > > > >>>>>>> Thanks for the explanation Paris. I’m starting to understand > this > > > more > > > >>>>> and I like the idea of snapshotting the state of an operator > before > > > >>>>> receiving all of the checkpoint barriers - this would allow more > > > things > > > >>> to > > > >>>>> happen at the same time instead of sequentially. As Zhijiang has > > > pointed > > > >>>>> out there are some things not considered in your proposal: > > overtaking > > > >>>>> output buffers, but maybe those things could be incorporated > > > together. > > > >>>>>>> > > > >>>>>>> Another thing is that from the wiki description I understood > that > > > the > > > >>>>> initial checkpointing is not initialised by any checkpoint > barrier, > > > but > > > >>> by > > > >>>>> an independent call/message from the Observer. I haven’t played > > with > > > >>> this > > > >>>>> idea a lot, but I had some discussion with Nico and it seems that > > it > > > >>> might > > > >>>>> work: > > > >>>>>>> > > > >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all tasks > > > >>>>>>> 2. Task (with two input channels l1 and l2) upon receiving RPC > > from > > > >>> 1., > > > >>>>> takes a snapshot of it's state and: > > > >>>>>>> a) broadcast checkpoint barrier down the stream to all channels > > > (let’s > > > >>>>> ignore for a moment potential for this barrier to overtake the > > buffer > > > >>>>> output data) > > > >>>>>>> b) for any input channel for which it hasn’t yet received > > > checkpoint > > > >>>>> barrier, the data are being added to the checkpoint > > > >>>>>>> c) once a channel (for example l1) receives a checkpoint > barrier, > > > the > > > >>>>> Task blocks reads from that channel (?) > > > >>>>>>> d) after all remaining channels (l2) receive checkpoint > barriers, > > > the > > > >>>>> Task first has to process the buffered data after that it can > > > unblock > > > >>> the > > > >>>>> reads from the channels > > > >>>>>>> > > > >>>>>>> Checkpoint barriers do not cascade/flow through different tasks > > > here. > > > >>>>> Checkpoint barrier emitted from Task1, reaches only the immediate > > > >>>>> downstream Tasks. Thanks to this setup, total checkpointing time > is > > > not > > > >>> sum > > > >>>>> of checkpointing times of all Tasks one by one, but more or less > > max > > > of > > > >>> the > > > >>>>> slowest Tasks. Right? > > > >>>>>>> > > > >>>>>>> Couple of intriguing thoughts are: > > > >>>>>>> 3. checkpoint barriers overtaking the output buffers > > > >>>>>>> 4. can we keep processing some data (in order to not waste CPU > > > cycles) > > > >>>>> after we have taking the snapshot of the Task. I think we could. > > > >>>>>>> > > > >>>>>>> Piotrek > > > >>>>>>> > > > >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> > wrote: > > > >>>>>>>> > > > >>>>>>>> Great discussion! I'm excited that this is already under > > > >>>>> consideration! Are > > > >>>>>>>> there any JIRAs or other traces of discussion to follow? > > > >>>>>>>> > > > >>>>>>>> Paris, if I understand correctly, then the proposal is to not > > > block > > > >>> any > > > >>>>>>>> input channel at all, but only log data from the backpressured > > > >>> channel > > > >>>>> (and > > > >>>>>>>> make it part of the snapshot) until the barrier arrives? This > is > > > >>>>>>>> intriguing. But probably there is also a benefit of to not > > > continue > > > >>>>> reading > > > >>>>>>>> I1 since that could speed up retrieval from I2. Also, if the > > user > > > >>> code > > > >>>>> is > > > >>>>>>>> the cause of backpressure, this would avoid pumping more data > > into > > > >>> the > > > >>>>>>>> process function. > > > >>>>>>>> > > > >>>>>>>> Thanks, > > > >>>>>>>> Thomas > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang < > > > wangzhijiang...@aliyun.com > > > >>>>> .invalid> > > > >>>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Hi Paris, > > > >>>>>>>>> > > > >>>>>>>>> Thanks for the detailed sharing. And I think it is very > similar > > > with > > > >>>>> the > > > >>>>>>>>> way of overtaking we proposed before. > > > >>>>>>>>> > > > >>>>>>>>> There are some tiny difference: > > > >>>>>>>>> The way of overtaking might need to snapshot all the > > input/output > > > >>>>> queues. > > > >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input > channels > > > >>> after > > > >>>>> the > > > >>>>>>>>> first barrier arrives, which might reduce the state sizea > bit. > > > But > > > >>>>> normally > > > >>>>>>>>> there should be less buffers for the first input channel with > > > >>> barrier. > > > >>>>>>>>> The output barrier still follows with regular data stream in > > > Chandy > > > >>>>>>>>> Lamport, the same way as current flink. For overtaking way, > we > > > need > > > >>>>> to pay > > > >>>>>>>>> extra efforts to make barrier transport firstly before outque > > > queue > > > >>> on > > > >>>>>>>>> upstream side, and change the way of barrier alignment based > on > > > >>>>> receiving > > > >>>>>>>>> instead of current reading on downstream side. > > > >>>>>>>>> In the backpressure caused by data skew, the first barrier in > > > almost > > > >>>>> empty > > > >>>>>>>>> input channel should arrive much eariler than the last heavy > > load > > > >>>>> input > > > >>>>>>>>> channel, so the Chandy Lamport could benefit well. But for > the > > > case > > > >>>>> of all > > > >>>>>>>>> balanced heavy load input channels, I mean the first arrived > > > barrier > > > >>>>> might > > > >>>>>>>>> still take much time, then the overtaking way could still fit > > > well > > > >>> to > > > >>>>> speed > > > >>>>>>>>> up checkpoint. > > > >>>>>>>>> Anyway, your proposed suggestion is helpful on my side, > > > especially > > > >>>>>>>>> considering some implementation details . > > > >>>>>>>>> > > > >>>>>>>>> Best, > > > >>>>>>>>> Zhijiang > > > >>>>>>>>> > > > ------------------------------------------------------------------ > > > >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com> > > > >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03 > > > >>>>>>>>> To:dev <dev@flink.apache.org> > > > >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com> > > > >>>>>>>>> Subject:Re: Checkpointing under backpressure > > > >>>>>>>>> > > > >>>>>>>>> yes! It’s quite similar I think. Though mind that the devil > is > > > in > > > >>> the > > > >>>>>>>>> details, i.e., the temporal order actions are taken. > > > >>>>>>>>> > > > >>>>>>>>> To clarify, let us say you have a task T with two input > > channels > > > I1 > > > >>>>> and I2. > > > >>>>>>>>> The Chandy Lamport execution flow is the following: > > > >>>>>>>>> > > > >>>>>>>>> 1) T receives barrier from I1 and... > > > >>>>>>>>> 2) ...the following three actions happen atomically > > > >>>>>>>>> I ) T snapshots its state T* > > > >>>>>>>>> II) T forwards marker to its outputs > > > >>>>>>>>> III) T starts logging all events of I2 (only) into a buffer > M* > > > >>>>>>>>> - Also notice here that T does NOT block I1 as it does in > > aligned > > > >>>>>>>>> snapshots - > > > >>>>>>>>> 3) Eventually T receives barrier from I2 and stops recording > > > events. > > > >>>>> Its > > > >>>>>>>>> asynchronously captured snapshot is now complete: {T*,M*}. > > > >>>>>>>>> Upon recovery all messages of M* should be replayed in FIFO > > > order. > > > >>>>>>>>> > > > >>>>>>>>> With this approach alignment does not create a deadlock > > situation > > > >>>>> since > > > >>>>>>>>> anyway 2.II happens asynchronously and messages can be logged > > as > > > >>> well > > > >>>>>>>>> asynchronously during the process of the snapshot. If there > is > > > >>>>>>>>> back-pressure in a pipeline the cause is most probably not > this > > > >>>>> algorithm. > > > >>>>>>>>> > > > >>>>>>>>> Back to your observation, the answer : yes and no. In your > > > network > > > >>>>> model, > > > >>>>>>>>> I can see the logic of “logging” and “committing” a final > > > snapshot > > > >>>>> being > > > >>>>>>>>> provided by the channel implementation. However, do mind that > > the > > > >>>>> first > > > >>>>>>>>> barrier always needs to go “all the way” to initiate the > Chandy > > > >>>>> Lamport > > > >>>>>>>>> algorithm logic. > > > >>>>>>>>> > > > >>>>>>>>> The above flow has been proven using temporal logic in my phd > > > thesis > > > >>>>> in > > > >>>>>>>>> case you are interested about the proof. > > > >>>>>>>>> I hope this helps a little clarifying things. Let me know if > > > there > > > >>> is > > > >>>>> any > > > >>>>>>>>> confusing point to disambiguate. I would be more than happy > to > > > help > > > >>>>> if I > > > >>>>>>>>> can. > > > >>>>>>>>> > > > >>>>>>>>> Paris > > > >>>>>>>>> > > > >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski < > pi...@ververica.com > > > > > > >>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots > > > don’t > > > >>>>> you > > > >>>>>>>>> still have to wait for the “checkpoint barrier” to arrive in > > > order > > > >>> to > > > >>>>> know > > > >>>>>>>>> when have you already received all possible messages from the > > > >>> upstream > > > >>>>>>>>> tasks/operators? So instead of processing the “in flight” > > > messages > > > >>>>> (as the > > > >>>>>>>>> Flink is doing currently), you are sending them to an > > “observer”? > > > >>>>>>>>>> > > > >>>>>>>>>> In that case, that’s sounds similar to “checkpoint barriers > > > >>>>> overtaking > > > >>>>>>>>> in flight records” (aka unaligned checkpoints). Just for us, > > the > > > >>>>> observer > > > >>>>>>>>> is a snapshot state. > > > >>>>>>>>>> > > > >>>>>>>>>> Piotrek > > > >>>>>>>>>> > > > >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone < > > > seniorcarb...@gmail.com> > > > >>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>> Interesting problem! Thanks for bringing it up Thomas. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe > Chandy-Lamport > > > >>>>> snapshots > > > >>>>>>>>> [1] would help out solve this problem more elegantly without > > > >>>>> sacrificing > > > >>>>>>>>> correctness. > > > >>>>>>>>>>> - They do not need alignment, only (async) logging for > > > in-flight > > > >>>>>>>>> records between the time the first barrier is processed until > > the > > > >>> last > > > >>>>>>>>> barrier arrives in a task. > > > >>>>>>>>>>> - They work fine for failure recovery as long as logged > > records > > > >>> are > > > >>>>>>>>> replayed on startup. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still > > necessary > > > >>> for > > > >>>>>>>>> transactional sink commits + any sort of reconfiguration > (e.g., > > > >>>>> rescaling, > > > >>>>>>>>> updating the logic of operators to evolve an application > etc.). > > > >>>>>>>>>>> > > > >>>>>>>>>>> I don’t completely understand the “overtaking” approach but > > if > > > you > > > >>>>> have > > > >>>>>>>>> a concrete definition I would be happy to check it out and > help > > > if I > > > >>>>> can! > > > >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by logging > > > things > > > >>> in > > > >>>>>>>>> pending channels in a task snapshot before the barrier > arrives. > > > >>>>>>>>>>> > > > >>>>>>>>>>> -Paris > > > >>>>>>>>>>> > > > >>>>>>>>>>> [1] > > > >>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm > > > >>>>> < > > > >>>>>>>>> > https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm > > > > > > >>>>>>>>>>> > > > >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski < > > pi...@ververica.com > > > > > > > >>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Hi Thomas, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> As Zhijiang has responded, we are now in the process of > > > >>> discussing > > > >>>>> how > > > >>>>>>>>> to address this issue and one of the solution that we are > > > discussing > > > >>>>> is > > > >>>>>>>>> exactly what you are proposing: checkpoint barriers > overtaking > > > the > > > >>> in > > > >>>>>>>>> flight data and make the in flight data part of the > checkpoint. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> If everything works well, we will be able to present > result > > of > > > >>> our > > > >>>>>>>>> discussions on the dev mailing list soon. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Piotrek > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang < > > > wangzhijiang...@aliyun.com > > > >>>>> .INVALID> > > > >>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi Thomas, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks for proposing this concern. The barrier alignment > > > takes > > > >>>>> long > > > >>>>>>>>> time in backpressure case which could cause several problems: > > > >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned. > > > >>>>>>>>>>>>> 2. The recovery cost is high once failover, because much > > data > > > >>>>> needs > > > >>>>>>>>> to be replayed. > > > >>>>>>>>>>>>> 3. The delay for commit-based sink is high in > exactly-once. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> For credit-based flow control from release-1.5, the > amount > > of > > > >>>>>>>>> in-flighting buffers before barrier alignment is reduced, so > we > > > >>> could > > > >>>>> get a > > > >>>>>>>>> bit > > > >>>>>>>>>>>>> benefits from speeding checkpoint aspect. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the channels > > which > > > >>>>> already > > > >>>>>>>>> received the barrier in practice. But actually we ever did > the > > > >>>>> similar thing > > > >>>>>>>>>>>>> to speed barrier alighment before. I am not quite sure > that > > > >>>>>>>>> release-1.8 covers this feature. There were some relevant > > > >>> discussions > > > >>>>> under > > > >>>>>>>>> jira [1]. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> For release-1.10, the community is now discussing the > > > feature of > > > >>>>>>>>> unaligned checkpoint which is mainly for resolving above > > > concerns. > > > >>> The > > > >>>>>>>>> basic idea > > > >>>>>>>>>>>>> is to make barrier overtakes the output/input buffer > queue > > to > > > >>>>> speed > > > >>>>>>>>> alignment, and snapshot the input/output buffers as part of > > > >>> checkpoint > > > >>>>>>>>> state. The > > > >>>>>>>>>>>>> details have not confirmed yet and is still under > > discussion. > > > >>>>> Wish we > > > >>>>>>>>> could make some improvments for the release-1.10. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523 > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>> Zhijiang > > > >>>>>>>>>>>>> > > > >>> ------------------------------------------------------------------ > > > >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org> > > > >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38 > > > >>>>>>>>>>>>> To:dev <dev@flink.apache.org> > > > >>>>>>>>>>>>> Subject:Checkpointing under backpressure > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> One of the major operational difficulties we observe with > > > Flink > > > >>>>> are > > > >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for > > both > > > >>>>>>>>> confirmation > > > >>>>>>>>>>>>> of my understanding of the current behavior as well as > > > pointers > > > >>>>> for > > > >>>>>>>>> future > > > >>>>>>>>>>>>> improvement work: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Prior to introduction of credit based flow control in the > > > >>> network > > > >>>>>>>>> stack [1] > > > >>>>>>>>>>>>> [2], checkpoint barriers would back up with the data for > > all > > > >>>>> logical > > > >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the > > > buffers > > > >>> are > > > >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are only > > held > > > >>>>> back for > > > >>>>>>>>>>>>> channels that have backpressure, while others can > continue > > > >>>>> processing > > > >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot > > "overtake > > > >>>>> data", > > > >>>>>>>>>>>>> therefore checkpoint alignment remains affected for the > > > channel > > > >>>>> with > > > >>>>>>>>>>>>> backpressure, with the potential for slow checkpointing > and > > > >>>>> timeouts. > > > >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the > maximum > > > >>>>> in-transit > > > >>>>>>>>>>>>> buffers per channel, resulting in an improvement compared > > to > > > >>>>> previous > > > >>>>>>>>>>>>> versions of Flink. Also, the backpressure based > checkpoint > > > >>>>> alignment > > > >>>>>>>>> can > > > >>>>>>>>>>>>> help the barrier advance faster on the receiver side (by > > > >>>>> suspending > > > >>>>>>>>>>>>> channels that have already delivered the barrier). Is > that > > > >>>>> accurate > > > >>>>>>>>> as of > > > >>>>>>>>>>>>> Flink 1.8? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> What appears to be missing to completely unblock > > > checkpointing > > > >>> is > > > >>>>> a > > > >>>>>>>>>>>>> mechanism for checkpoints to overtake the data. That > would > > > help > > > >>> in > > > >>>>>>>>>>>>> situations where the processing itself is the bottleneck > > and > > > >>>>>>>>> prioritization > > > >>>>>>>>>>>>> in the network stack alone cannot address the barrier > > delay. > > > Was > > > >>>>>>>>> there any > > > >>>>>>>>>>>>> related discussion? One possible solution would be to > drain > > > >>>>> incoming > > > >>>>>>>>> data > > > >>>>>>>>>>>>> till the barrier and make it part of the checkpoint > instead > > > of > > > >>>>>>>>> processing > > > >>>>>>>>>>>>> it. This is somewhat related to asynchronous processing, > > but > > > I'm > > > >>>>>>>>> thinking > > > >>>>>>>>>>>>> more of a solution that is automated in the Flink runtime > > for > > > >>> the > > > >>>>>>>>>>>>> backpressure scenario only. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>> Thomas > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> [1] > > > >>> https://flink.apache.org/2019/06/05/flink-network-stack.html > > > >>>>>>>>>>>>> [2] > > > >>>>>>>>>>>>> > > > >>>>>>>>> > > > >>>>> > > > >>> > > > > > > https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>> > > > >>> > > > > > > > > > > > > > > >