Quick note: The current implementation is Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part)
On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski <pi...@ververica.com> wrote: > > Thanks for the great ideas so far. > > +1 > > Regarding other things raised, I mostly agree with Stephan. > > I like the idea of simultaneously starting the checkpoint everywhere via > RPC call (especially in cases where Tasks are busy doing some synchronous > operations for example for tens of milliseconds. In that case every network > exchange adds tens of milliseconds of delay in propagating the checkpoint). > However I agree that this might be a premature optimisation assuming the > current state of our code (we already have checkpoint barriers). > > However I like the idea of switching from: > > 1. A -> S -> F (Align -> snapshot -> forward markers) > > To > > 2. S -> F -> L (Snapshot -> forward markers -> log pending channels) > > Or even to > > 6. F -> S -> L (Forward markers -> snapshot -> log pending channels) > > It feels to me like this would decouple propagation of checkpoints from > costs of synchronous snapshots and waiting for all of the checkpoint > barriers to arrive (even if they will overtake in-flight records, this > might take some time). > > > What I like about the Chandy Lamport approach (2.) initiated from > sources is that: > > - Snapshotting imposes no modification to normal processing. > > Yes, I agree that would be nice. Currently, during the alignment and > blocking of the input channels, we might be wasting CPU cycles of up stream > tasks. If we succeed in designing new checkpointing mechanism to not > disrupt/block regular data processing (% the extra IO cost for logging the > in-flight records), that would be a huge improvement. > > Piotrek > > > On 14 Aug 2019, at 14:56, Paris Carbone <seniorcarb...@gmail.com> wrote: > > > > Sure I see. In cases when no periodic aligned snapshots are employed > this is the only option. > > > > Two things that were not highlighted enough so far on the proposed > protocol (included my mails): > > - The Recovery/Reconfiguration strategy should strictly prioritise > processing logged events before entering normal task input operation. > Otherwise causality can be violated. This also means dataflow recovery will > be expected to be slower to the one employed on an aligned snapshot. > > - Same as with state capture, markers should be forwarded upon > first marker received on input. No later than that. Otherwise we have > duplicate side effects. > > > > Thanks for the great ideas so far. > > > > Paris > > > >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org> wrote: > >> > >> Scaling with unaligned checkpoints might be a necessity. > >> > >> Let's assume the job failed due to a lost TaskManager, but no new > >> TaskManager becomes available. > >> In that case we need to scale down based on the latest complete > checkpoint, > >> because we cannot produce a new checkpoint. > >> > >> > >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone <seniorcarb...@gmail.com> > >> wrote: > >> > >>> +1 I think we are on the same page Stephan. > >>> > >>> Rescaling on unaligned checkpoint sounds challenging and a bit > >>> unnecessary. No? > >>> Why not sticking to aligned snapshots for live > reconfiguration/rescaling? > >>> It’s a pretty rare operation and it would simplify things by a lot. > >>> Everything can be “staged” upon alignment including replacing channels > and > >>> tasks. > >>> > >>> -Paris > >>> > >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote: > >>>> > >>>> Hi all! > >>>> > >>>> Yes, the first proposal of "unaligend checkpoints" (probably two years > >>> back > >>>> now) drew a major inspiration from Chandy Lamport, as did actually the > >>>> original checkpointing algorithm. > >>>> > >>>> "Logging data between first and last barrier" versus "barrier jumping > >>> over > >>>> buffer and storing those buffers" is pretty close same. > >>>> However, there are a few nice benefits of the proposal of unaligned > >>>> checkpoints over Chandy-Lamport. > >>>> > >>>> *## Benefits of Unaligned Checkpoints* > >>>> > >>>> (1) It is very similar to the original algorithm (can be seen an an > >>>> optional feature purely in the network stack) and thus can share > lot's of > >>>> code paths. > >>>> > >>>> (2) Less data stored. If we make the "jump over buffers" part timeout > >>> based > >>>> (for example barrier overtakes buffers if not flushed within 10ms) > then > >>>> checkpoints are in the common case of flowing pipelines aligned > without > >>>> in-flight data. Only back pressured cases store some in-flight data, > >>> which > >>>> means we don't regress in the common case and only fix the back > pressure > >>>> case. > >>>> > >>>> (3) Faster checkpoints. Chandy Lamport still waits for all barriers to > >>>> arrive naturally, logging on the way. If data processing is slow, this > >>> can > >>>> still take quite a while. > >>>> > >>>> ==> I think both these points are strong reasons to not change the > >>>> mechanism away from "trigger sources" and start with CL-style "trigger > >>> all". > >>>> > >>>> > >>>> *## Possible ways to combine Chandy Lamport and Unaligned Checkpoints* > >>>> > >>>> We can think about something like "take state snapshot on first > barrier" > >>>> and then store buffers until the other barriers arrive. Inside the > >>> network > >>>> stack, barriers could still overtake and persist buffers. > >>>> The benefit would be less latency increase in the channels which > already > >>>> have received barriers. > >>>> However, as mentioned before, not prioritizing the inputs from which > >>>> barriers are still missing can also have an adverse effect. > >>>> > >>>> > >>>> *## Concerning upgrades* > >>>> > >>>> I think it is a fair restriction to say that upgrades need to happen > on > >>>> aligned checkpoints. It is a rare enough operation. > >>>> > >>>> > >>>> *## Concerning re-scaling (changing parallelism)* > >>>> > >>>> We need to support that on unaligned checkpoints as well. There are > >>> several > >>>> feature proposals about automatic scaling, especially down scaling in > >>> case > >>>> of missing resources. The last snapshot might be a regular > checkpoint, so > >>>> all checkpoints need to support rescaling. > >>>> > >>>> > >>>> *## Concerning end-to-end checkpoint duration and "trigger sources" > >>> versus > >>>> "trigger all"* > >>>> > >>>> I think for the end-to-end checkpoint duration, an "overtake buffers" > >>>> approach yields faster checkpoints, as mentioned above (Chandy Lamport > >>>> logging still needs to wait for barrier to flow). > >>>> > >>>> I don't see the benefit of a "trigger all tasks via RPC concurrently" > >>>> approach. Bear in mind that it is still a globally coordinated > approach > >>> and > >>>> you need to wait for the global checkpoint to complete before > committing > >>>> any side effects. > >>>> I believe that the checkpoint time is more determined by the state > >>>> checkpoint writing, and the global coordination and metadata commit, > than > >>>> by the difference in alignment time between "trigger from source and > jump > >>>> over buffers" versus "trigger all tasks concurrently". > >>>> > >>>> Trying to optimize a few tens of milliseconds out of the network stack > >>>> sends (and changing the overall checkpointing approach completely for > >>> that) > >>>> while staying with a globally coordinated checkpoint will send us > down a > >>>> path to a dead end. > >>>> > >>>> To really bring task persistence latency down to 10s of milliseconds > (so > >>> we > >>>> can frequently commit in sinks), we need to take an approach without > any > >>>> global coordination. Tasks need to establish a persistent recovery > point > >>>> individually and at their own discretion, only then can it be frequent > >>>> enough. To get there, they would need to decouple themselves from the > >>>> predecessor and successor tasks (via something like persistent > channels). > >>>> This is a different discussion, though, somewhat orthogonal to this > one > >>>> here. > >>>> > >>>> Best, > >>>> Stephan > >>>> > >>>> > >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <pi...@ververica.com> > >>> wrote: > >>>> > >>>>> Hi again, > >>>>> > >>>>> Zhu Zhu let me think about this more. Maybe as Paris is writing, we > do > >>> not > >>>>> need to block any channels at all, at least assuming credit base flow > >>>>> control. Regarding what should happen with the following checkpoint > is > >>>>> another question. Also, should we support concurrent checkpoints and > >>>>> subsuming checkpoints as we do now? Maybe not… > >>>>> > >>>>> Paris > >>>>> > >>>>> Re > >>>>> I. 2. a) and b) - yes, this would have to be taken into an account > >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint time will > >>>>> probably be longer than it could be. It might affect external > systems. > >>> For > >>>>> example Kafka, which automatically time outs lingering transactions, > and > >>>>> for us, the transaction time is equal to the time between two > >>> checkpoints. > >>>>> > >>>>> II 1. - I’m confused. To make things straight. Flink is currently > >>>>> snapshotting once it receives all of the checkpoint barriers from > all of > >>>>> the input channels and only then it broadcasts the checkpoint barrier > >>> down > >>>>> the stream. And this is correct from exactly-once perspective. > >>>>> > >>>>> As far as I understand, your proposal based on Chandy Lamport > algorithm, > >>>>> is snapshotting the state of the operator on the first checkpoint > >>> barrier, > >>>>> which also looks correct to me. > >>>>> > >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more about > this. > >>>>> > >>>>> V. Yes, we still need aligned checkpoints, as they are easier for > state > >>>>> migration and upgrades. > >>>>> > >>>>> Piotrek > >>>>> > >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone <seniorcarb...@gmail.com> > >>> wrote: > >>>>>> > >>>>>> Now I see a little more clearly what you have in mind. Thanks for > the > >>>>> explanation! > >>>>>> There are a few intermixed concepts here, some how to do with > >>>>> correctness some with performance. > >>>>>> Before delving deeper I will just enumerate a few things to make > myself > >>>>> a little more helpful if I can. > >>>>>> > >>>>>> I. Initiation > >>>>>> ------------- > >>>>>> > >>>>>> 1. RPC to sources only is a less intrusive way to initiate snapshots > >>>>> since you utilize better pipeline parallelism (only a small subset of > >>> tasks > >>>>> is running progressively the protocol at a time, if snapshotting is > >>> async > >>>>> the overall overhead might not even be observable). > >>>>>> > >>>>>> 2. If we really want an RPC to all initiation take notice of the > >>>>> following implications: > >>>>>> > >>>>>> a. (correctness) RPC calls are not guaranteed to arrive in every > >>>>> task before a marker from a preceding task. > >>>>>> > >>>>>> b. (correctness) Either the RPC call OR the first arriving marker > >>>>> should initiate the algorithm. Whichever comes first. If you only do > it > >>> per > >>>>> RPC call then you capture a "late" state that includes side effects > of > >>>>> already logged events. > >>>>>> > >>>>>> c. (performance) Lots of IO will be invoked at the same time on > >>>>> the backend store from all tasks. This might lead to high congestion > in > >>>>> async snapshots. > >>>>>> > >>>>>> II. Capturing State First > >>>>>> ------------------------- > >>>>>> > >>>>>> 1. (correctness) Capturing state at the last marker sounds > incorrect to > >>>>> me (state contains side effects of already logged events based on the > >>>>> proposed scheme). This results into duplicate processing. No? > >>>>>> > >>>>>> III. Channel Blocking / "Alignment" > >>>>>> ----------------------------------- > >>>>>> > >>>>>> 1. (performance?) What is the added benefit? We dont want a > "complete" > >>>>> transactional snapshot, async snapshots are purely for > failure-recovery. > >>>>> Thus, I dont see why this needs to be imposed at the expense of > >>>>> performance/throughput. With the proposed scheme the whole dataflow > >>> anyway > >>>>> enters snapshotting/logging mode so tasks more or less snapshot > >>>>> concurrently. > >>>>>> > >>>>>> IV Marker Bypassing > >>>>>> ------------------- > >>>>>> > >>>>>> 1. (correctness) This leads to equivalent in-flight snapshots so > with > >>>>> some quick thinking correct. I will try to model this later and get > >>> back > >>>>> to you in case I find something wrong. > >>>>>> > >>>>>> 2. (performance) It also sounds like a meaningful optimisation! I > like > >>>>> thinking of this as a push-based snapshot. i.e., the producing task > >>> somehow > >>>>> triggers forward a consumer/channel to capture its state. By example > >>>>> consider T1 -> |marker t1| -> T2. > >>>>>> > >>>>>> V. Usage of "Async" Snapshots > >>>>>> --------------------- > >>>>>> > >>>>>> 1. Do you see this as a full replacement of "full" aligned > >>>>> snapshots/savepoints? In my view async shanpshots will be needed from > >>> time > >>>>> to time but not as frequently. Yet, it seems like a valid approach > >>> solely > >>>>> for failure-recovery on the same configuration. Here's why: > >>>>>> > >>>>>> a. With original snapshotting there is a strong duality between > >>>>>> a stream input (offsets) and committed side effects (internal > >>>>> states and external commits to transactional sinks). While in the > async > >>>>> version, there are uncommitted operations (inflight records). Thus, > you > >>>>> cannot use these snapshots for e.g., submitting sql queries with > >>> snapshot > >>>>> isolation. Also, the original snapshotting gives a lot of potential > for > >>>>> flink to make proper transactional commits externally. > >>>>>> > >>>>>> b. Reconfiguration is very tricky, you probably know that better. > >>>>> Inflight channel state is no longer valid in a new configuration > (i.e., > >>> new > >>>>> dataflow graph, new operators, updated operator logic, different > >>> channels, > >>>>> different parallelism) > >>>>>> > >>>>>> 2. Async snapshots can also be potentially useful for monitoring the > >>>>> general health of a dataflow since they can be analyzed by the task > >>> manager > >>>>> about the general performance of a job graph and spot bottlenecks for > >>>>> example. > >>>>>> > >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com> > wrote: > >>>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> Thomas: > >>>>>>> There are no Jira tickets yet (or maybe there is something very old > >>>>> somewhere). First we want to discuss it, next present FLIP and at > last > >>>>> create tickets :) > >>>>>>> > >>>>>>>> if I understand correctly, then the proposal is to not block any > >>>>>>>> input channel at all, but only log data from the backpressured > >>> channel > >>>>> (and > >>>>>>>> make it part of the snapshot) until the barrier arrives > >>>>>>> > >>>>>>> I would guess that it would be better to block the reads, unless we > >>> can > >>>>> already process the records from the blocked channel… > >>>>>>> > >>>>>>> Paris: > >>>>>>> > >>>>>>> Thanks for the explanation Paris. I’m starting to understand this > more > >>>>> and I like the idea of snapshotting the state of an operator before > >>>>> receiving all of the checkpoint barriers - this would allow more > things > >>> to > >>>>> happen at the same time instead of sequentially. As Zhijiang has > pointed > >>>>> out there are some things not considered in your proposal: overtaking > >>>>> output buffers, but maybe those things could be incorporated > together. > >>>>>>> > >>>>>>> Another thing is that from the wiki description I understood that > the > >>>>> initial checkpointing is not initialised by any checkpoint barrier, > but > >>> by > >>>>> an independent call/message from the Observer. I haven’t played with > >>> this > >>>>> idea a lot, but I had some discussion with Nico and it seems that it > >>> might > >>>>> work: > >>>>>>> > >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all tasks > >>>>>>> 2. Task (with two input channels l1 and l2) upon receiving RPC from > >>> 1., > >>>>> takes a snapshot of it's state and: > >>>>>>> a) broadcast checkpoint barrier down the stream to all channels > (let’s > >>>>> ignore for a moment potential for this barrier to overtake the buffer > >>>>> output data) > >>>>>>> b) for any input channel for which it hasn’t yet received > checkpoint > >>>>> barrier, the data are being added to the checkpoint > >>>>>>> c) once a channel (for example l1) receives a checkpoint barrier, > the > >>>>> Task blocks reads from that channel (?) > >>>>>>> d) after all remaining channels (l2) receive checkpoint barriers, > the > >>>>> Task first has to process the buffered data after that it can > unblock > >>> the > >>>>> reads from the channels > >>>>>>> > >>>>>>> Checkpoint barriers do not cascade/flow through different tasks > here. > >>>>> Checkpoint barrier emitted from Task1, reaches only the immediate > >>>>> downstream Tasks. Thanks to this setup, total checkpointing time is > not > >>> sum > >>>>> of checkpointing times of all Tasks one by one, but more or less max > of > >>> the > >>>>> slowest Tasks. Right? > >>>>>>> > >>>>>>> Couple of intriguing thoughts are: > >>>>>>> 3. checkpoint barriers overtaking the output buffers > >>>>>>> 4. can we keep processing some data (in order to not waste CPU > cycles) > >>>>> after we have taking the snapshot of the Task. I think we could. > >>>>>>> > >>>>>>> Piotrek > >>>>>>> > >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> wrote: > >>>>>>>> > >>>>>>>> Great discussion! I'm excited that this is already under > >>>>> consideration! Are > >>>>>>>> there any JIRAs or other traces of discussion to follow? > >>>>>>>> > >>>>>>>> Paris, if I understand correctly, then the proposal is to not > block > >>> any > >>>>>>>> input channel at all, but only log data from the backpressured > >>> channel > >>>>> (and > >>>>>>>> make it part of the snapshot) until the barrier arrives? This is > >>>>>>>> intriguing. But probably there is also a benefit of to not > continue > >>>>> reading > >>>>>>>> I1 since that could speed up retrieval from I2. Also, if the user > >>> code > >>>>> is > >>>>>>>> the cause of backpressure, this would avoid pumping more data into > >>> the > >>>>>>>> process function. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Thomas > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang < > wangzhijiang...@aliyun.com > >>>>> .invalid> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Paris, > >>>>>>>>> > >>>>>>>>> Thanks for the detailed sharing. And I think it is very similar > with > >>>>> the > >>>>>>>>> way of overtaking we proposed before. > >>>>>>>>> > >>>>>>>>> There are some tiny difference: > >>>>>>>>> The way of overtaking might need to snapshot all the input/output > >>>>> queues. > >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input channels > >>> after > >>>>> the > >>>>>>>>> first barrier arrives, which might reduce the state sizea bit. > But > >>>>> normally > >>>>>>>>> there should be less buffers for the first input channel with > >>> barrier. > >>>>>>>>> The output barrier still follows with regular data stream in > Chandy > >>>>>>>>> Lamport, the same way as current flink. For overtaking way, we > need > >>>>> to pay > >>>>>>>>> extra efforts to make barrier transport firstly before outque > queue > >>> on > >>>>>>>>> upstream side, and change the way of barrier alignment based on > >>>>> receiving > >>>>>>>>> instead of current reading on downstream side. > >>>>>>>>> In the backpressure caused by data skew, the first barrier in > almost > >>>>> empty > >>>>>>>>> input channel should arrive much eariler than the last heavy load > >>>>> input > >>>>>>>>> channel, so the Chandy Lamport could benefit well. But for the > case > >>>>> of all > >>>>>>>>> balanced heavy load input channels, I mean the first arrived > barrier > >>>>> might > >>>>>>>>> still take much time, then the overtaking way could still fit > well > >>> to > >>>>> speed > >>>>>>>>> up checkpoint. > >>>>>>>>> Anyway, your proposed suggestion is helpful on my side, > especially > >>>>>>>>> considering some implementation details . > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Zhijiang > >>>>>>>>> > ------------------------------------------------------------------ > >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com> > >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03 > >>>>>>>>> To:dev <dev@flink.apache.org> > >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com> > >>>>>>>>> Subject:Re: Checkpointing under backpressure > >>>>>>>>> > >>>>>>>>> yes! It’s quite similar I think. Though mind that the devil is > in > >>> the > >>>>>>>>> details, i.e., the temporal order actions are taken. > >>>>>>>>> > >>>>>>>>> To clarify, let us say you have a task T with two input channels > I1 > >>>>> and I2. > >>>>>>>>> The Chandy Lamport execution flow is the following: > >>>>>>>>> > >>>>>>>>> 1) T receives barrier from I1 and... > >>>>>>>>> 2) ...the following three actions happen atomically > >>>>>>>>> I ) T snapshots its state T* > >>>>>>>>> II) T forwards marker to its outputs > >>>>>>>>> III) T starts logging all events of I2 (only) into a buffer M* > >>>>>>>>> - Also notice here that T does NOT block I1 as it does in aligned > >>>>>>>>> snapshots - > >>>>>>>>> 3) Eventually T receives barrier from I2 and stops recording > events. > >>>>> Its > >>>>>>>>> asynchronously captured snapshot is now complete: {T*,M*}. > >>>>>>>>> Upon recovery all messages of M* should be replayed in FIFO > order. > >>>>>>>>> > >>>>>>>>> With this approach alignment does not create a deadlock situation > >>>>> since > >>>>>>>>> anyway 2.II happens asynchronously and messages can be logged as > >>> well > >>>>>>>>> asynchronously during the process of the snapshot. If there is > >>>>>>>>> back-pressure in a pipeline the cause is most probably not this > >>>>> algorithm. > >>>>>>>>> > >>>>>>>>> Back to your observation, the answer : yes and no. In your > network > >>>>> model, > >>>>>>>>> I can see the logic of “logging” and “committing” a final > snapshot > >>>>> being > >>>>>>>>> provided by the channel implementation. However, do mind that the > >>>>> first > >>>>>>>>> barrier always needs to go “all the way” to initiate the Chandy > >>>>> Lamport > >>>>>>>>> algorithm logic. > >>>>>>>>> > >>>>>>>>> The above flow has been proven using temporal logic in my phd > thesis > >>>>> in > >>>>>>>>> case you are interested about the proof. > >>>>>>>>> I hope this helps a little clarifying things. Let me know if > there > >>> is > >>>>> any > >>>>>>>>> confusing point to disambiguate. I would be more than happy to > help > >>>>> if I > >>>>>>>>> can. > >>>>>>>>> > >>>>>>>>> Paris > >>>>>>>>> > >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <pi...@ververica.com> > >>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots > don’t > >>>>> you > >>>>>>>>> still have to wait for the “checkpoint barrier” to arrive in > order > >>> to > >>>>> know > >>>>>>>>> when have you already received all possible messages from the > >>> upstream > >>>>>>>>> tasks/operators? So instead of processing the “in flight” > messages > >>>>> (as the > >>>>>>>>> Flink is doing currently), you are sending them to an “observer”? > >>>>>>>>>> > >>>>>>>>>> In that case, that’s sounds similar to “checkpoint barriers > >>>>> overtaking > >>>>>>>>> in flight records” (aka unaligned checkpoints). Just for us, the > >>>>> observer > >>>>>>>>> is a snapshot state. > >>>>>>>>>> > >>>>>>>>>> Piotrek > >>>>>>>>>> > >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone < > seniorcarb...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Interesting problem! Thanks for bringing it up Thomas. > >>>>>>>>>>> > >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport > >>>>> snapshots > >>>>>>>>> [1] would help out solve this problem more elegantly without > >>>>> sacrificing > >>>>>>>>> correctness. > >>>>>>>>>>> - They do not need alignment, only (async) logging for > in-flight > >>>>>>>>> records between the time the first barrier is processed until the > >>> last > >>>>>>>>> barrier arrives in a task. > >>>>>>>>>>> - They work fine for failure recovery as long as logged records > >>> are > >>>>>>>>> replayed on startup. > >>>>>>>>>>> > >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still necessary > >>> for > >>>>>>>>> transactional sink commits + any sort of reconfiguration (e.g., > >>>>> rescaling, > >>>>>>>>> updating the logic of operators to evolve an application etc.). > >>>>>>>>>>> > >>>>>>>>>>> I don’t completely understand the “overtaking” approach but if > you > >>>>> have > >>>>>>>>> a concrete definition I would be happy to check it out and help > if I > >>>>> can! > >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by logging > things > >>> in > >>>>>>>>> pending channels in a task snapshot before the barrier arrives. > >>>>>>>>>>> > >>>>>>>>>>> -Paris > >>>>>>>>>>> > >>>>>>>>>>> [1] > >>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm > >>>>> < > >>>>>>>>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm> > >>>>>>>>>>> > >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com > > > >>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi Thomas, > >>>>>>>>>>>> > >>>>>>>>>>>> As Zhijiang has responded, we are now in the process of > >>> discussing > >>>>> how > >>>>>>>>> to address this issue and one of the solution that we are > discussing > >>>>> is > >>>>>>>>> exactly what you are proposing: checkpoint barriers overtaking > the > >>> in > >>>>>>>>> flight data and make the in flight data part of the checkpoint. > >>>>>>>>>>>> > >>>>>>>>>>>> If everything works well, we will be able to present result of > >>> our > >>>>>>>>> discussions on the dev mailing list soon. > >>>>>>>>>>>> > >>>>>>>>>>>> Piotrek > >>>>>>>>>>>> > >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang < > wangzhijiang...@aliyun.com > >>>>> .INVALID> > >>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Thomas, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for proposing this concern. The barrier alignment > takes > >>>>> long > >>>>>>>>> time in backpressure case which could cause several problems: > >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned. > >>>>>>>>>>>>> 2. The recovery cost is high once failover, because much data > >>>>> needs > >>>>>>>>> to be replayed. > >>>>>>>>>>>>> 3. The delay for commit-based sink is high in exactly-once. > >>>>>>>>>>>>> > >>>>>>>>>>>>> For credit-based flow control from release-1.5, the amount of > >>>>>>>>> in-flighting buffers before barrier alignment is reduced, so we > >>> could > >>>>> get a > >>>>>>>>> bit > >>>>>>>>>>>>> benefits from speeding checkpoint aspect. > >>>>>>>>>>>>> > >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the channels which > >>>>> already > >>>>>>>>> received the barrier in practice. But actually we ever did the > >>>>> similar thing > >>>>>>>>>>>>> to speed barrier alighment before. I am not quite sure that > >>>>>>>>> release-1.8 covers this feature. There were some relevant > >>> discussions > >>>>> under > >>>>>>>>> jira [1]. > >>>>>>>>>>>>> > >>>>>>>>>>>>> For release-1.10, the community is now discussing the > feature of > >>>>>>>>> unaligned checkpoint which is mainly for resolving above > concerns. > >>> The > >>>>>>>>> basic idea > >>>>>>>>>>>>> is to make barrier overtakes the output/input buffer queue to > >>>>> speed > >>>>>>>>> alignment, and snapshot the input/output buffers as part of > >>> checkpoint > >>>>>>>>> state. The > >>>>>>>>>>>>> details have not confirmed yet and is still under discussion. > >>>>> Wish we > >>>>>>>>> could make some improvments for the release-1.10. > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523 > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Zhijiang > >>>>>>>>>>>>> > >>> ------------------------------------------------------------------ > >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org> > >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38 > >>>>>>>>>>>>> To:dev <dev@flink.apache.org> > >>>>>>>>>>>>> Subject:Checkpointing under backpressure > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi, > >>>>>>>>>>>>> > >>>>>>>>>>>>> One of the major operational difficulties we observe with > Flink > >>>>> are > >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for both > >>>>>>>>> confirmation > >>>>>>>>>>>>> of my understanding of the current behavior as well as > pointers > >>>>> for > >>>>>>>>> future > >>>>>>>>>>>>> improvement work: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Prior to introduction of credit based flow control in the > >>> network > >>>>>>>>> stack [1] > >>>>>>>>>>>>> [2], checkpoint barriers would back up with the data for all > >>>>> logical > >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the > buffers > >>> are > >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are only held > >>>>> back for > >>>>>>>>>>>>> channels that have backpressure, while others can continue > >>>>> processing > >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot "overtake > >>>>> data", > >>>>>>>>>>>>> therefore checkpoint alignment remains affected for the > channel > >>>>> with > >>>>>>>>>>>>> backpressure, with the potential for slow checkpointing and > >>>>> timeouts. > >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the maximum > >>>>> in-transit > >>>>>>>>>>>>> buffers per channel, resulting in an improvement compared to > >>>>> previous > >>>>>>>>>>>>> versions of Flink. Also, the backpressure based checkpoint > >>>>> alignment > >>>>>>>>> can > >>>>>>>>>>>>> help the barrier advance faster on the receiver side (by > >>>>> suspending > >>>>>>>>>>>>> channels that have already delivered the barrier). Is that > >>>>> accurate > >>>>>>>>> as of > >>>>>>>>>>>>> Flink 1.8? > >>>>>>>>>>>>> > >>>>>>>>>>>>> What appears to be missing to completely unblock > checkpointing > >>> is > >>>>> a > >>>>>>>>>>>>> mechanism for checkpoints to overtake the data. That would > help > >>> in > >>>>>>>>>>>>> situations where the processing itself is the bottleneck and > >>>>>>>>> prioritization > >>>>>>>>>>>>> in the network stack alone cannot address the barrier delay. > Was > >>>>>>>>> there any > >>>>>>>>>>>>> related discussion? One possible solution would be to drain > >>>>> incoming > >>>>>>>>> data > >>>>>>>>>>>>> till the barrier and make it part of the checkpoint instead > of > >>>>>>>>> processing > >>>>>>>>>>>>> it. This is somewhat related to asynchronous processing, but > I'm > >>>>>>>>> thinking > >>>>>>>>>>>>> more of a solution that is automated in the Flink runtime for > >>> the > >>>>>>>>>>>>> backpressure scenario only. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Thomas > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1] > >>> https://flink.apache.org/2019/06/05/flink-network-stack.html > >>>>>>>>>>>>> [2] > >>>>>>>>>>>>> > >>>>>>>>> > >>>>> > >>> > https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>> > >>> > > > >