+1 I think we are on the same page Stephan. Rescaling on unaligned checkpoint sounds challenging and a bit unnecessary. No? Why not sticking to aligned snapshots for live reconfiguration/rescaling? It’s a pretty rare operation and it would simplify things by a lot. Everything can be “staged” upon alignment including replacing channels and tasks.
-Paris > On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote: > > Hi all! > > Yes, the first proposal of "unaligend checkpoints" (probably two years back > now) drew a major inspiration from Chandy Lamport, as did actually the > original checkpointing algorithm. > > "Logging data between first and last barrier" versus "barrier jumping over > buffer and storing those buffers" is pretty close same. > However, there are a few nice benefits of the proposal of unaligned > checkpoints over Chandy-Lamport. > > *## Benefits of Unaligned Checkpoints* > > (1) It is very similar to the original algorithm (can be seen an an > optional feature purely in the network stack) and thus can share lot's of > code paths. > > (2) Less data stored. If we make the "jump over buffers" part timeout based > (for example barrier overtakes buffers if not flushed within 10ms) then > checkpoints are in the common case of flowing pipelines aligned without > in-flight data. Only back pressured cases store some in-flight data, which > means we don't regress in the common case and only fix the back pressure > case. > > (3) Faster checkpoints. Chandy Lamport still waits for all barriers to > arrive naturally, logging on the way. If data processing is slow, this can > still take quite a while. > > ==> I think both these points are strong reasons to not change the > mechanism away from "trigger sources" and start with CL-style "trigger all". > > > *## Possible ways to combine Chandy Lamport and Unaligned Checkpoints* > > We can think about something like "take state snapshot on first barrier" > and then store buffers until the other barriers arrive. Inside the network > stack, barriers could still overtake and persist buffers. > The benefit would be less latency increase in the channels which already > have received barriers. > However, as mentioned before, not prioritizing the inputs from which > barriers are still missing can also have an adverse effect. > > > *## Concerning upgrades* > > I think it is a fair restriction to say that upgrades need to happen on > aligned checkpoints. It is a rare enough operation. > > > *## Concerning re-scaling (changing parallelism)* > > We need to support that on unaligned checkpoints as well. There are several > feature proposals about automatic scaling, especially down scaling in case > of missing resources. The last snapshot might be a regular checkpoint, so > all checkpoints need to support rescaling. > > > *## Concerning end-to-end checkpoint duration and "trigger sources" versus > "trigger all"* > > I think for the end-to-end checkpoint duration, an "overtake buffers" > approach yields faster checkpoints, as mentioned above (Chandy Lamport > logging still needs to wait for barrier to flow). > > I don't see the benefit of a "trigger all tasks via RPC concurrently" > approach. Bear in mind that it is still a globally coordinated approach and > you need to wait for the global checkpoint to complete before committing > any side effects. > I believe that the checkpoint time is more determined by the state > checkpoint writing, and the global coordination and metadata commit, than > by the difference in alignment time between "trigger from source and jump > over buffers" versus "trigger all tasks concurrently". > > Trying to optimize a few tens of milliseconds out of the network stack > sends (and changing the overall checkpointing approach completely for that) > while staying with a globally coordinated checkpoint will send us down a > path to a dead end. > > To really bring task persistence latency down to 10s of milliseconds (so we > can frequently commit in sinks), we need to take an approach without any > global coordination. Tasks need to establish a persistent recovery point > individually and at their own discretion, only then can it be frequent > enough. To get there, they would need to decouple themselves from the > predecessor and successor tasks (via something like persistent channels). > This is a different discussion, though, somewhat orthogonal to this one > here. > > Best, > Stephan > > > On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <pi...@ververica.com> wrote: > >> Hi again, >> >> Zhu Zhu let me think about this more. Maybe as Paris is writing, we do not >> need to block any channels at all, at least assuming credit base flow >> control. Regarding what should happen with the following checkpoint is >> another question. Also, should we support concurrent checkpoints and >> subsuming checkpoints as we do now? Maybe not… >> >> Paris >> >> Re >> I. 2. a) and b) - yes, this would have to be taken into an account >> I. 2. c) and IV. 2. - without those, end to end checkpoint time will >> probably be longer than it could be. It might affect external systems. For >> example Kafka, which automatically time outs lingering transactions, and >> for us, the transaction time is equal to the time between two checkpoints. >> >> II 1. - I’m confused. To make things straight. Flink is currently >> snapshotting once it receives all of the checkpoint barriers from all of >> the input channels and only then it broadcasts the checkpoint barrier down >> the stream. And this is correct from exactly-once perspective. >> >> As far as I understand, your proposal based on Chandy Lamport algorithm, >> is snapshotting the state of the operator on the first checkpoint barrier, >> which also looks correct to me. >> >> III. 1. As I responded to Zhu Zhu, let me think a bit more about this. >> >> V. Yes, we still need aligned checkpoints, as they are easier for state >> migration and upgrades. >> >> Piotrek >> >>> On 14 Aug 2019, at 11:22, Paris Carbone <seniorcarb...@gmail.com> wrote: >>> >>> Now I see a little more clearly what you have in mind. Thanks for the >> explanation! >>> There are a few intermixed concepts here, some how to do with >> correctness some with performance. >>> Before delving deeper I will just enumerate a few things to make myself >> a little more helpful if I can. >>> >>> I. Initiation >>> ------------- >>> >>> 1. RPC to sources only is a less intrusive way to initiate snapshots >> since you utilize better pipeline parallelism (only a small subset of tasks >> is running progressively the protocol at a time, if snapshotting is async >> the overall overhead might not even be observable). >>> >>> 2. If we really want an RPC to all initiation take notice of the >> following implications: >>> >>> a. (correctness) RPC calls are not guaranteed to arrive in every >> task before a marker from a preceding task. >>> >>> b. (correctness) Either the RPC call OR the first arriving marker >> should initiate the algorithm. Whichever comes first. If you only do it per >> RPC call then you capture a "late" state that includes side effects of >> already logged events. >>> >>> c. (performance) Lots of IO will be invoked at the same time on >> the backend store from all tasks. This might lead to high congestion in >> async snapshots. >>> >>> II. Capturing State First >>> ------------------------- >>> >>> 1. (correctness) Capturing state at the last marker sounds incorrect to >> me (state contains side effects of already logged events based on the >> proposed scheme). This results into duplicate processing. No? >>> >>> III. Channel Blocking / "Alignment" >>> ----------------------------------- >>> >>> 1. (performance?) What is the added benefit? We dont want a "complete" >> transactional snapshot, async snapshots are purely for failure-recovery. >> Thus, I dont see why this needs to be imposed at the expense of >> performance/throughput. With the proposed scheme the whole dataflow anyway >> enters snapshotting/logging mode so tasks more or less snapshot >> concurrently. >>> >>> IV Marker Bypassing >>> ------------------- >>> >>> 1. (correctness) This leads to equivalent in-flight snapshots so with >> some quick thinking correct. I will try to model this later and get back >> to you in case I find something wrong. >>> >>> 2. (performance) It also sounds like a meaningful optimisation! I like >> thinking of this as a push-based snapshot. i.e., the producing task somehow >> triggers forward a consumer/channel to capture its state. By example >> consider T1 -> |marker t1| -> T2. >>> >>> V. Usage of "Async" Snapshots >>> --------------------- >>> >>> 1. Do you see this as a full replacement of "full" aligned >> snapshots/savepoints? In my view async shanpshots will be needed from time >> to time but not as frequently. Yet, it seems like a valid approach solely >> for failure-recovery on the same configuration. Here's why: >>> >>> a. With original snapshotting there is a strong duality between >>> a stream input (offsets) and committed side effects (internal >> states and external commits to transactional sinks). While in the async >> version, there are uncommitted operations (inflight records). Thus, you >> cannot use these snapshots for e.g., submitting sql queries with snapshot >> isolation. Also, the original snapshotting gives a lot of potential for >> flink to make proper transactional commits externally. >>> >>> b. Reconfiguration is very tricky, you probably know that better. >> Inflight channel state is no longer valid in a new configuration (i.e., new >> dataflow graph, new operators, updated operator logic, different channels, >> different parallelism) >>> >>> 2. Async snapshots can also be potentially useful for monitoring the >> general health of a dataflow since they can be analyzed by the task manager >> about the general performance of a job graph and spot bottlenecks for >> example. >>> >>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com> wrote: >>>> >>>> Hi, >>>> >>>> Thomas: >>>> There are no Jira tickets yet (or maybe there is something very old >> somewhere). First we want to discuss it, next present FLIP and at last >> create tickets :) >>>> >>>>> if I understand correctly, then the proposal is to not block any >>>>> input channel at all, but only log data from the backpressured channel >> (and >>>>> make it part of the snapshot) until the barrier arrives >>>> >>>> I would guess that it would be better to block the reads, unless we can >> already process the records from the blocked channel… >>>> >>>> Paris: >>>> >>>> Thanks for the explanation Paris. I’m starting to understand this more >> and I like the idea of snapshotting the state of an operator before >> receiving all of the checkpoint barriers - this would allow more things to >> happen at the same time instead of sequentially. As Zhijiang has pointed >> out there are some things not considered in your proposal: overtaking >> output buffers, but maybe those things could be incorporated together. >>>> >>>> Another thing is that from the wiki description I understood that the >> initial checkpointing is not initialised by any checkpoint barrier, but by >> an independent call/message from the Observer. I haven’t played with this >> idea a lot, but I had some discussion with Nico and it seems that it might >> work: >>>> >>>> 1. JobManager sends and RPC “start checkpoint” to all tasks >>>> 2. Task (with two input channels l1 and l2) upon receiving RPC from 1., >> takes a snapshot of it's state and: >>>> a) broadcast checkpoint barrier down the stream to all channels (let’s >> ignore for a moment potential for this barrier to overtake the buffer >> output data) >>>> b) for any input channel for which it hasn’t yet received checkpoint >> barrier, the data are being added to the checkpoint >>>> c) once a channel (for example l1) receives a checkpoint barrier, the >> Task blocks reads from that channel (?) >>>> d) after all remaining channels (l2) receive checkpoint barriers, the >> Task first has to process the buffered data after that it can unblock the >> reads from the channels >>>> >>>> Checkpoint barriers do not cascade/flow through different tasks here. >> Checkpoint barrier emitted from Task1, reaches only the immediate >> downstream Tasks. Thanks to this setup, total checkpointing time is not sum >> of checkpointing times of all Tasks one by one, but more or less max of the >> slowest Tasks. Right? >>>> >>>> Couple of intriguing thoughts are: >>>> 3. checkpoint barriers overtaking the output buffers >>>> 4. can we keep processing some data (in order to not waste CPU cycles) >> after we have taking the snapshot of the Task. I think we could. >>>> >>>> Piotrek >>>> >>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> wrote: >>>>> >>>>> Great discussion! I'm excited that this is already under >> consideration! Are >>>>> there any JIRAs or other traces of discussion to follow? >>>>> >>>>> Paris, if I understand correctly, then the proposal is to not block any >>>>> input channel at all, but only log data from the backpressured channel >> (and >>>>> make it part of the snapshot) until the barrier arrives? This is >>>>> intriguing. But probably there is also a benefit of to not continue >> reading >>>>> I1 since that could speed up retrieval from I2. Also, if the user code >> is >>>>> the cause of backpressure, this would avoid pumping more data into the >>>>> process function. >>>>> >>>>> Thanks, >>>>> Thomas >>>>> >>>>> >>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <wangzhijiang...@aliyun.com >> .invalid> >>>>> wrote: >>>>> >>>>>> Hi Paris, >>>>>> >>>>>> Thanks for the detailed sharing. And I think it is very similar with >> the >>>>>> way of overtaking we proposed before. >>>>>> >>>>>> There are some tiny difference: >>>>>> The way of overtaking might need to snapshot all the input/output >> queues. >>>>>> Chandy Lamport seems only need to snaphost (n-1) input channels after >> the >>>>>> first barrier arrives, which might reduce the state sizea bit. But >> normally >>>>>> there should be less buffers for the first input channel with barrier. >>>>>> The output barrier still follows with regular data stream in Chandy >>>>>> Lamport, the same way as current flink. For overtaking way, we need >> to pay >>>>>> extra efforts to make barrier transport firstly before outque queue on >>>>>> upstream side, and change the way of barrier alignment based on >> receiving >>>>>> instead of current reading on downstream side. >>>>>> In the backpressure caused by data skew, the first barrier in almost >> empty >>>>>> input channel should arrive much eariler than the last heavy load >> input >>>>>> channel, so the Chandy Lamport could benefit well. But for the case >> of all >>>>>> balanced heavy load input channels, I mean the first arrived barrier >> might >>>>>> still take much time, then the overtaking way could still fit well to >> speed >>>>>> up checkpoint. >>>>>> Anyway, your proposed suggestion is helpful on my side, especially >>>>>> considering some implementation details . >>>>>> >>>>>> Best, >>>>>> Zhijiang >>>>>> ------------------------------------------------------------------ >>>>>> From:Paris Carbone <seniorcarb...@gmail.com> >>>>>> Send Time:2019年8月13日(星期二) 14:03 >>>>>> To:dev <dev@flink.apache.org> >>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com> >>>>>> Subject:Re: Checkpointing under backpressure >>>>>> >>>>>> yes! It’s quite similar I think. Though mind that the devil is in the >>>>>> details, i.e., the temporal order actions are taken. >>>>>> >>>>>> To clarify, let us say you have a task T with two input channels I1 >> and I2. >>>>>> The Chandy Lamport execution flow is the following: >>>>>> >>>>>> 1) T receives barrier from I1 and... >>>>>> 2) ...the following three actions happen atomically >>>>>> I ) T snapshots its state T* >>>>>> II) T forwards marker to its outputs >>>>>> III) T starts logging all events of I2 (only) into a buffer M* >>>>>> - Also notice here that T does NOT block I1 as it does in aligned >>>>>> snapshots - >>>>>> 3) Eventually T receives barrier from I2 and stops recording events. >> Its >>>>>> asynchronously captured snapshot is now complete: {T*,M*}. >>>>>> Upon recovery all messages of M* should be replayed in FIFO order. >>>>>> >>>>>> With this approach alignment does not create a deadlock situation >> since >>>>>> anyway 2.II happens asynchronously and messages can be logged as well >>>>>> asynchronously during the process of the snapshot. If there is >>>>>> back-pressure in a pipeline the cause is most probably not this >> algorithm. >>>>>> >>>>>> Back to your observation, the answer : yes and no. In your network >> model, >>>>>> I can see the logic of “logging” and “committing” a final snapshot >> being >>>>>> provided by the channel implementation. However, do mind that the >> first >>>>>> barrier always needs to go “all the way” to initiate the Chandy >> Lamport >>>>>> algorithm logic. >>>>>> >>>>>> The above flow has been proven using temporal logic in my phd thesis >> in >>>>>> case you are interested about the proof. >>>>>> I hope this helps a little clarifying things. Let me know if there is >> any >>>>>> confusing point to disambiguate. I would be more than happy to help >> if I >>>>>> can. >>>>>> >>>>>> Paris >>>>>> >>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <pi...@ververica.com> >> wrote: >>>>>>> >>>>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots don’t >> you >>>>>> still have to wait for the “checkpoint barrier” to arrive in order to >> know >>>>>> when have you already received all possible messages from the upstream >>>>>> tasks/operators? So instead of processing the “in flight” messages >> (as the >>>>>> Flink is doing currently), you are sending them to an “observer”? >>>>>>> >>>>>>> In that case, that’s sounds similar to “checkpoint barriers >> overtaking >>>>>> in flight records” (aka unaligned checkpoints). Just for us, the >> observer >>>>>> is a snapshot state. >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone <seniorcarb...@gmail.com> >>>>>> wrote: >>>>>>>> >>>>>>>> Interesting problem! Thanks for bringing it up Thomas. >>>>>>>> >>>>>>>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport >> snapshots >>>>>> [1] would help out solve this problem more elegantly without >> sacrificing >>>>>> correctness. >>>>>>>> - They do not need alignment, only (async) logging for in-flight >>>>>> records between the time the first barrier is processed until the last >>>>>> barrier arrives in a task. >>>>>>>> - They work fine for failure recovery as long as logged records are >>>>>> replayed on startup. >>>>>>>> >>>>>>>> Flink’s “alligned” savepoints would probably be still necessary for >>>>>> transactional sink commits + any sort of reconfiguration (e.g., >> rescaling, >>>>>> updating the logic of operators to evolve an application etc.). >>>>>>>> >>>>>>>> I don’t completely understand the “overtaking” approach but if you >> have >>>>>> a concrete definition I would be happy to check it out and help if I >> can! >>>>>>>> Mind that Chandy-Lamport essentially does this by logging things in >>>>>> pending channels in a task snapshot before the barrier arrives. >>>>>>>> >>>>>>>> -Paris >>>>>>>> >>>>>>>> [1] https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm >> < >>>>>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm> >>>>>>>> >>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com> >> wrote: >>>>>>>>> >>>>>>>>> Hi Thomas, >>>>>>>>> >>>>>>>>> As Zhijiang has responded, we are now in the process of discussing >> how >>>>>> to address this issue and one of the solution that we are discussing >> is >>>>>> exactly what you are proposing: checkpoint barriers overtaking the in >>>>>> flight data and make the in flight data part of the checkpoint. >>>>>>>>> >>>>>>>>> If everything works well, we will be able to present result of our >>>>>> discussions on the dev mailing list soon. >>>>>>>>> >>>>>>>>> Piotrek >>>>>>>>> >>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang <wangzhijiang...@aliyun.com >> .INVALID> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Thomas, >>>>>>>>>> >>>>>>>>>> Thanks for proposing this concern. The barrier alignment takes >> long >>>>>> time in backpressure case which could cause several problems: >>>>>>>>>> 1. Checkpoint timeout as you mentioned. >>>>>>>>>> 2. The recovery cost is high once failover, because much data >> needs >>>>>> to be replayed. >>>>>>>>>> 3. The delay for commit-based sink is high in exactly-once. >>>>>>>>>> >>>>>>>>>> For credit-based flow control from release-1.5, the amount of >>>>>> in-flighting buffers before barrier alignment is reduced, so we could >> get a >>>>>> bit >>>>>>>>>> benefits from speeding checkpoint aspect. >>>>>>>>>> >>>>>>>>>> In release-1.8, I guess we did not suspend the channels which >> already >>>>>> received the barrier in practice. But actually we ever did the >> similar thing >>>>>>>>>> to speed barrier alighment before. I am not quite sure that >>>>>> release-1.8 covers this feature. There were some relevant discussions >> under >>>>>> jira [1]. >>>>>>>>>> >>>>>>>>>> For release-1.10, the community is now discussing the feature of >>>>>> unaligned checkpoint which is mainly for resolving above concerns. The >>>>>> basic idea >>>>>>>>>> is to make barrier overtakes the output/input buffer queue to >> speed >>>>>> alignment, and snapshot the input/output buffers as part of checkpoint >>>>>> state. The >>>>>>>>>> details have not confirmed yet and is still under discussion. >> Wish we >>>>>> could make some improvments for the release-1.10. >>>>>>>>>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523 >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Zhijiang >>>>>>>>>> ------------------------------------------------------------------ >>>>>>>>>> From:Thomas Weise <t...@apache.org> >>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38 >>>>>>>>>> To:dev <dev@flink.apache.org> >>>>>>>>>> Subject:Checkpointing under backpressure >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> One of the major operational difficulties we observe with Flink >> are >>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for both >>>>>> confirmation >>>>>>>>>> of my understanding of the current behavior as well as pointers >> for >>>>>> future >>>>>>>>>> improvement work: >>>>>>>>>> >>>>>>>>>> Prior to introduction of credit based flow control in the network >>>>>> stack [1] >>>>>>>>>> [2], checkpoint barriers would back up with the data for all >> logical >>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the buffers are >>>>>>>>>> controlled per channel, and checkpoint barriers are only held >> back for >>>>>>>>>> channels that have backpressure, while others can continue >> processing >>>>>>>>>> normally. However, checkpoint barriers still cannot "overtake >> data", >>>>>>>>>> therefore checkpoint alignment remains affected for the channel >> with >>>>>>>>>> backpressure, with the potential for slow checkpointing and >> timeouts. >>>>>>>>>> Albeit the delay of barriers would be capped by the maximum >> in-transit >>>>>>>>>> buffers per channel, resulting in an improvement compared to >> previous >>>>>>>>>> versions of Flink. Also, the backpressure based checkpoint >> alignment >>>>>> can >>>>>>>>>> help the barrier advance faster on the receiver side (by >> suspending >>>>>>>>>> channels that have already delivered the barrier). Is that >> accurate >>>>>> as of >>>>>>>>>> Flink 1.8? >>>>>>>>>> >>>>>>>>>> What appears to be missing to completely unblock checkpointing is >> a >>>>>>>>>> mechanism for checkpoints to overtake the data. That would help in >>>>>>>>>> situations where the processing itself is the bottleneck and >>>>>> prioritization >>>>>>>>>> in the network stack alone cannot address the barrier delay. Was >>>>>> there any >>>>>>>>>> related discussion? One possible solution would be to drain >> incoming >>>>>> data >>>>>>>>>> till the barrier and make it part of the checkpoint instead of >>>>>> processing >>>>>>>>>> it. This is somewhat related to asynchronous processing, but I'm >>>>>> thinking >>>>>>>>>> more of a solution that is automated in the Flink runtime for the >>>>>>>>>> backpressure scenario only. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Thomas >>>>>>>>>> >>>>>>>>>> [1] https://flink.apache.org/2019/06/05/flink-network-stack.html >>>>>>>>>> [2] >>>>>>>>>> >>>>>> >> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>> >>> >> >>