Thanks for the update and write up Arvid. Piotrek
czw., 30 lip 2020 o 11:05 Arvid Heise <ar...@ververica.com> napisał(a): > Dear all, > > I just wanted to follow-up on this long discussion thread by announcing > that we implemented unaligned checkpoints in Flink 1.11. If you experience > long end-to-end checkpointing duration, you should try out unaligned > checkpoints [1] if the following applies: > > - Checkpointing is not bottlenecked by I/O (to state backend). Possible > reasons are: slow connections, rate limits, or huge operator or user > state. > - You can attribute the long duration to slow data flow. An operator in > the pipeline is severely lagging behind and you can easily see it in > Flink > Web UI. > - You cannot alleviate the problem by adjusting the degree of > parallelism to the slow operator, either because of temporal spikes or > lags > or because you don’t control the application in a platform-as-a-service > architecture. > > You can enable it in the flink-conf.yaml. > execution.checkpointing.unaligned: true > > Or in your application: > env.getCheckpointConfig().enableUnalignedCheckpoints() (Java/Scala) > env.get_checkpoint_config().enable_unaligned_checkpoints() (Python) > > Note that this relatively young feature still has a couple of limitations > that we resolve in future versions. > > - You cannot rescale or change the job graph when starting from an > unaligned checkpoint; you have to take a savepoint before rescaling. > Savepoints are always aligned, independent of the alignment setting of > checkpoints. This feature has the highest priority and will be > available in > upcoming releases. > - Flink currently does not support concurrent unaligned checkpoints. > However, due to the more predictable and shorter checkpointing times, > concurrent checkpoints might not be needed at all. However, savepoints > can > also not happen concurrently to unaligned checkpoints, so they will take > slightly longer. > - SourceFunctions are user-defined, run a separate thread, and output > records under lock. When they block because of backpressure, the induced > checkpoints cannot acquire the lock and checkpointing duration > increases. > We will provide SourceFunctions a way to also avoid blocking and > implement > it for all sources in Flink core, but because the code is ultimately > user-defined, we have no way to guarantee non-blocking behavior. > Nevertheless, since only sources are affected, the checkpointing > durations > are still much lower and most importantly do not increase with further > shuffles. Furthermore, Flink 1.11 also provides a new way to implement > sources (FLIP-27). This new source interface has a better threading > model, > such that users do not create their own threads anymore and Flink can > guarantee non-blocking behavior for these sources. > - Unaligned checkpoints break with an implicit guarantee in respect to > watermarks during recovery. Currently, Flink generates the watermark as > a > first step of recovery instead of storing the latest watermark in the > operators to ease rescaling. For unaligned checkpoints, this means > that, on > recovery, Flink generates watermarks after it restores in-flight data. > If > your pipeline uses an operator that applies the latest watermark on each > record, it will produce different results than for aligned checkpoints. > If > your operator depends on the latest watermark being always available, > then > the proper solution is to store the watermark in the operator state. To > support rescaling, watermarks should be stored per key-group in a > union-state. This feature has a high priority. > - Lastly, there is a conceptual weakness in unaligned checkpoints: when > an operator produces an arbitrary amount of outputs for a single input, > such as flatMap, all of these output records need to be stored into the > state for the unaligned checkpoint, which may increase the state size by > orders of magnitudes and slow down checkpointing and recovery. However, > since flatMap only needs alignment after a shuffle and rarely produces a > huge number of records for a single input, it’s more of a theoretic > problem. > > Lastly, we also plan to improve the configurations, such that ultimately, > unaligned checkpoints will be the default configuration. > > - Users will be able to configure a timeout, such that each operator > first tries to perform an aligned checkpoint. If the timeout is > triggered, > it switches to an unaligned checkpoint. Since the timeout would only > trigger in the niche use cases that unaligned checkpoints addresses, it > would mostly perform an aligned checkpoint under no or low backpressure. > Thus, together with the previously mentioned fixes for the limitation, > this > timeout would allow Flink to enable unaligned checkpoints by default. > - Another idea is to provide users to define a maximum state size for > the in-flight data. However, it might be hard for users to configure the > size correctly as it also requires to know how many buffers are used in > the > respective application and it might be even harder to actually use the > size > limit in a meaningful way. > - Lastly, to address the flatMap issue, there will be an option to > trigger the unaligned checkpoints on the last barrier of all input > channels > instead of the first. Thus, there is still an alignment phase but it > should > be rather quick as checkpoint barriers are still inserted at the head of > the output buffer queue. Conceptually, checkpoint barriers would still > not > depend on the data flow. > > We are currently preparing a blog post on this topic, from which I copied > some passages. We are happy to hear your feedback. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html#unaligned-checkpointing > > On Wed, Dec 4, 2019 at 9:07 PM Thomas Weise <t...@apache.org> wrote: > > > Hi Arvid, > > > > Thanks for putting together the proposal [1] > > > > I'm planning to take a closer look in the next few days. > > > > Has any of the work been translated to JIRAs yet and what would be the > > approximate target release? > > > > Thanks, > > Thomas > > > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints > > > > On Wed, Oct 2, 2019 at 12:11 PM Arvid Heise <ar...@ververica.com> wrote: > > > >> Sry incorrect link, please follow [1]. > >> > >> [1] > >> > >> > https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/%3CCAGZNd0FgVL0oDQJHpBwJ1Ha8QevsVG0FHixdet11tLhW2p-2hg%40mail.gmail.com%3E > >> > >> On Wed, Oct 2, 2019 at 3:44 PM Arvid Heise <ar...@ververica.com> wrote: > >> > >> > FYI, we published FLIP-76 to address the issue and discussion has been > >> > opened in [1]. > >> > > >> > Looking forward to your feedback, > >> > > >> > Arvid > >> > > >> > [1] > >> > > https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/browser > >> > > >> > On Thu, Aug 15, 2019 at 9:43 AM Yun Gao <yungao...@aliyun.com.invalid > > > >> > wrote: > >> > > >> >> Hi, > >> >> Very thanks for the great points! > >> >> > >> >> For the prioritizing inputs, from another point of view, I think > it > >> >> might not cause other bad effects, since we do not need to totally > >> block > >> >> the channels that have seen barriers after the operator has taking > >> >> snapshot. After the snapshotting, if the channels that has not seen > >> >> barriers have buffers, we could first logging and processing these > >> buffers > >> >> and if they do not have buffers, we can still processing the buffers > >> from > >> >> the channels that has seen barriers. Therefore, It seems prioritizing > >> >> inputs should be able to accelerate the checkpoint without other bad > >> >> effects. > >> >> > >> >> and @zhijiangFor making the unaligned checkpoint the only > mechanism > >> >> for all cases, I still think we should allow a configurable timeout > >> after > >> >> receiving the first barrier so that the channels may get "drained" > >> during > >> >> the timeout, as pointed out by Stephan. With such a timeout, we are > >> very > >> >> likely not need to snapshot the input buffers, which would be very > >> similar > >> >> to the current aligned checkpoint mechanism. > >> >> > >> >> Best, > >> >> Yun > >> >> > >> >> > >> >> ------------------------------------------------------------------ > >> >> From:zhijiang <wangzhijiang...@aliyun.com.INVALID> > >> >> Send Time:2019 Aug. 15 (Thu.) 02:22 > >> >> To:dev <dev@flink.apache.org> > >> >> Subject:Re: Checkpointing under backpressure > >> >> > >> >> > For the checkpoint to complete, any buffer that > >> >> > arrived prior to the barrier would be to be part of the > checkpointed > >> >> state. > >> >> > >> >> Yes, I agree. > >> >> > >> >> > So wouldn't it be important to finish persisting these buffers as > >> fast > >> >> as > >> >> > possible by prioritizing respective inputs? The task won't be able > to > >> >> > process records from the inputs that have seen the barrier fast > when > >> it > >> >> is > >> >> > already backpressured (or causing the backpressure). > >> >> > >> >> My previous understanding of prioritizing inputs is from task > >> processing > >> >> aspect after snapshot state. If from the persisting buffers aspect, I > >> think > >> >> it might be up to how we implement it. > >> >> If we only tag/reference which buffers in inputs be the part of > state, > >> >> and make the real persisting work is done in async way. That means > the > >> >> already tagged buffers could be processed by task w/o priority. > >> >> And only after all the persisting work done, the task would report to > >> >> coordinator of finished checkpoint on its side. The key point is how > we > >> >> implement to make task could continue processing buffers as soon as > >> >> possible. > >> >> > >> >> Thanks for the further explannation of requirements for speeding up > >> >> checkpoints in backpressure scenario. To make the savepoint finish > >> quickly > >> >> and then tune the setting to avoid backpressure is really a pratical > >> case. > >> >> I think this solution could cover this concern. > >> >> > >> >> Best, > >> >> Zhijiang > >> >> ------------------------------------------------------------------ > >> >> From:Thomas Weise <t...@apache.org> > >> >> Send Time:2019年8月14日(星期三) 19:48 > >> >> To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com> > >> >> Subject:Re: Checkpointing under backpressure > >> >> > >> >> --> > >> >> > >> >> On Wed, Aug 14, 2019 at 10:23 AM zhijiang > >> >> <wangzhijiang...@aliyun.com.invalid> wrote: > >> >> > >> >> > Thanks for these great points and disccusions! > >> >> > > >> >> > 1. Considering the way of triggering checkpoint RPC calls to all > the > >> >> tasks > >> >> > from Chandy Lamport, it combines two different mechanisms together > to > >> >> make > >> >> > sure that the trigger could be fast in different scenarios. > >> >> > But in flink world it might be not very worth trying that way, just > >> as > >> >> > Stephan's analysis for it. Another concern is that it might bring > >> more > >> >> > heavy loads for JobMaster broadcasting this checkpoint RPC to all > the > >> >> tasks > >> >> > in large scale job, especially for the very short checkpoint > >> interval. > >> >> > Furthermore it would also cause other important RPC to be executed > >> >> delay to > >> >> > bring potentail timeout risks. > >> >> > > >> >> > 2. I agree with the idea of drawing on the way "take state snapshot > >> on > >> >> > first barrier" from Chandy Lamport instead of barrier alignment > >> >> combining > >> >> > with unaligned checkpoints in flink. > >> >> > > >> >> > > >>>> The benefit would be less latency increase in the channels > >> which > >> >> > already have received barriers. > >> >> > > >>>> However, as mentioned before, not prioritizing the inputs > from > >> >> > which barriers are still missing can also have an adverse effect. > >> >> > > >> >> > I think we will not have an adverse effect if not prioritizing the > >> >> inputs > >> >> > w/o barriers in this case. After sync snapshot, the task could > >> actually > >> >> > process any input channels. For the input channel receiving the > first > >> >> > barrier, we already have the obvious boundary for persisting > buffers. > >> >> For > >> >> > other channels w/o barriers we could persist the following buffers > >> for > >> >> > these channels until barrier arrives in network. Because based on > the > >> >> > credit based flow control, the barrier does not need credit to > >> >> transport, > >> >> > then as long as the sender overtakes the barrier accross the output > >> >> queue, > >> >> > the network stack would transport this barrier immediately no > matter > >> >> with > >> >> > the inputs condition on receiver side. So there is no requirements > to > >> >> > consume accumulated buffers in these channels for higher priority. > If > >> >> so it > >> >> > seems that we will not waste any CPU cycles as Piotr concerns > before. > >> >> > > >> >> > >> >> I'm not sure I follow this. For the checkpoint to complete, any > buffer > >> >> that > >> >> arrived prior to the barrier would be to be part of the checkpointed > >> >> state. > >> >> So wouldn't it be important to finish persisting these buffers as > fast > >> as > >> >> possible by prioritizing respective inputs? The task won't be able to > >> >> process records from the inputs that have seen the barrier fast when > >> it is > >> >> already backpressured (or causing the backpressure). > >> >> > >> >> > >> >> > > >> >> > 3. Suppose the unaligned checkpoints performing well in practice, > is > >> it > >> >> > possible to make it as the only mechanism for handling all the > >> cases? I > >> >> > mean for the non-backpressure scenario, there are less buffers even > >> >> empty > >> >> > in input/output queue, then the "overtaking barrier--> trigger > >> snapshot > >> >> on > >> >> > first barrier--> persist buffers" might still work well. So we do > not > >> >> need > >> >> > to maintain two suits of mechanisms finally. > >> >> > > >> >> > 4. The initial motivation of this dicussion is for checkpoint > >> timeout > >> >> in > >> >> > backpressure scenario. If we adjust the default timeout to a very > big > >> >> > value, that means the checkpoint would never timeout and we only > >> need to > >> >> > wait it finish. Then are there still any other problems/concerns if > >> >> > checkpoint takes long time to finish? Althougn we already knew some > >> >> issues > >> >> > before, it is better to gather more user feedbacks to confirm which > >> >> aspects > >> >> > could be solved in this feature design. E.g. the sink commit delay > >> might > >> >> > not be coverd by unaligned solution. > >> >> > > >> >> > >> >> Checkpoints taking too long is the concern that sparks this > discussion > >> >> (timeout is just a symptom). The slowness issue also applies to the > >> >> savepoint use case. We would need to be able to take a savepoint fast > >> in > >> >> order to roll forward a fix that can alleviate the backpressure (like > >> >> changing parallelism or making a different configuration change). > >> >> > >> >> > >> >> > > >> >> > Best, > >> >> > Zhijiang > >> >> > ------------------------------------------------------------------ > >> >> > From:Stephan Ewen <se...@apache.org> > >> >> > Send Time:2019年8月14日(星期三) 17:43 > >> >> > To:dev <dev@flink.apache.org> > >> >> > Subject:Re: Checkpointing under backpressure > >> >> > > >> >> > Quick note: The current implementation is > >> >> > > >> >> > Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part) > >> >> > > >> >> > On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski < > pi...@ververica.com> > >> >> > wrote: > >> >> > > >> >> > > > Thanks for the great ideas so far. > >> >> > > > >> >> > > +1 > >> >> > > > >> >> > > Regarding other things raised, I mostly agree with Stephan. > >> >> > > > >> >> > > I like the idea of simultaneously starting the checkpoint > >> everywhere > >> >> via > >> >> > > RPC call (especially in cases where Tasks are busy doing some > >> >> synchronous > >> >> > > operations for example for tens of milliseconds. In that case > every > >> >> > network > >> >> > > exchange adds tens of milliseconds of delay in propagating the > >> >> > checkpoint). > >> >> > > However I agree that this might be a premature optimisation > >> assuming > >> >> the > >> >> > > current state of our code (we already have checkpoint barriers). > >> >> > > > >> >> > > However I like the idea of switching from: > >> >> > > > >> >> > > 1. A -> S -> F (Align -> snapshot -> forward markers) > >> >> > > > >> >> > > To > >> >> > > > >> >> > > 2. S -> F -> L (Snapshot -> forward markers -> log pending > >> channels) > >> >> > > > >> >> > > Or even to > >> >> > > > >> >> > > 6. F -> S -> L (Forward markers -> snapshot -> log pending > >> channels) > >> >> > > > >> >> > > It feels to me like this would decouple propagation of > checkpoints > >> >> from > >> >> > > costs of synchronous snapshots and waiting for all of the > >> checkpoint > >> >> > > barriers to arrive (even if they will overtake in-flight records, > >> this > >> >> > > might take some time). > >> >> > > > >> >> > > > What I like about the Chandy Lamport approach (2.) initiated > from > >> >> > > sources is that: > >> >> > > > - Snapshotting imposes no modification to normal > >> processing. > >> >> > > > >> >> > > Yes, I agree that would be nice. Currently, during the alignment > >> and > >> >> > > blocking of the input channels, we might be wasting CPU cycles of > >> up > >> >> > stream > >> >> > > tasks. If we succeed in designing new checkpointing mechanism to > >> not > >> >> > > disrupt/block regular data processing (% the extra IO cost for > >> logging > >> >> > the > >> >> > > in-flight records), that would be a huge improvement. > >> >> > > > >> >> > > Piotrek > >> >> > > > >> >> > > > On 14 Aug 2019, at 14:56, Paris Carbone < > seniorcarb...@gmail.com > >> > > >> >> > wrote: > >> >> > > > > >> >> > > > Sure I see. In cases when no periodic aligned snapshots are > >> employed > >> >> > > this is the only option. > >> >> > > > > >> >> > > > Two things that were not highlighted enough so far on the > >> proposed > >> >> > > protocol (included my mails): > >> >> > > > - The Recovery/Reconfiguration strategy should strictly > >> >> > prioritise > >> >> > > processing logged events before entering normal task input > >> operation. > >> >> > > Otherwise causality can be violated. This also means dataflow > >> recovery > >> >> > will > >> >> > > be expected to be slower to the one employed on an aligned > >> snapshot. > >> >> > > > - Same as with state capture, markers should be forwarded > >> upon > >> >> > > first marker received on input. No later than that. Otherwise we > >> have > >> >> > > duplicate side effects. > >> >> > > > > >> >> > > > Thanks for the great ideas so far. > >> >> > > > > >> >> > > > Paris > >> >> > > > > >> >> > > >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org> > >> wrote: > >> >> > > >> > >> >> > > >> Scaling with unaligned checkpoints might be a necessity. > >> >> > > >> > >> >> > > >> Let's assume the job failed due to a lost TaskManager, but no > >> new > >> >> > > >> TaskManager becomes available. > >> >> > > >> In that case we need to scale down based on the latest > complete > >> >> > > checkpoint, > >> >> > > >> because we cannot produce a new checkpoint. > >> >> > > >> > >> >> > > >> > >> >> > > >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone < > >> >> > seniorcarb...@gmail.com> > >> >> > > >> wrote: > >> >> > > >> > >> >> > > >>> +1 I think we are on the same page Stephan. > >> >> > > >>> > >> >> > > >>> Rescaling on unaligned checkpoint sounds challenging and a > bit > >> >> > > >>> unnecessary. No? > >> >> > > >>> Why not sticking to aligned snapshots for live > >> >> > > reconfiguration/rescaling? > >> >> > > >>> It’s a pretty rare operation and it would simplify things by > a > >> >> lot. > >> >> > > >>> Everything can be “staged” upon alignment including replacing > >> >> > channels > >> >> > > and > >> >> > > >>> tasks. > >> >> > > >>> > >> >> > > >>> -Paris > >> >> > > >>> > >> >> > > >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> > >> wrote: > >> >> > > >>>> > >> >> > > >>>> Hi all! > >> >> > > >>>> > >> >> > > >>>> Yes, the first proposal of "unaligend checkpoints" (probably > >> two > >> >> > years > >> >> > > >>> back > >> >> > > >>>> now) drew a major inspiration from Chandy Lamport, as did > >> >> actually > >> >> > the > >> >> > > >>>> original checkpointing algorithm. > >> >> > > >>>> > >> >> > > >>>> "Logging data between first and last barrier" versus > "barrier > >> >> > jumping > >> >> > > >>> over > >> >> > > >>>> buffer and storing those buffers" is pretty close same. > >> >> > > >>>> However, there are a few nice benefits of the proposal of > >> >> unaligned > >> >> > > >>>> checkpoints over Chandy-Lamport. > >> >> > > >>>> > >> >> > > >>>> *## Benefits of Unaligned Checkpoints* > >> >> > > >>>> > >> >> > > >>>> (1) It is very similar to the original algorithm (can be > seen > >> an > >> >> an > >> >> > > >>>> optional feature purely in the network stack) and thus can > >> share > >> >> > > lot's of > >> >> > > >>>> code paths. > >> >> > > >>>> > >> >> > > >>>> (2) Less data stored. If we make the "jump over buffers" > part > >> >> > timeout > >> >> > > >>> based > >> >> > > >>>> (for example barrier overtakes buffers if not flushed within > >> >> 10ms) > >> >> > > then > >> >> > > >>>> checkpoints are in the common case of flowing pipelines > >> aligned > >> >> > > without > >> >> > > >>>> in-flight data. Only back pressured cases store some > in-flight > >> >> data, > >> >> > > >>> which > >> >> > > >>>> means we don't regress in the common case and only fix the > >> back > >> >> > > pressure > >> >> > > >>>> case. > >> >> > > >>>> > >> >> > > >>>> (3) Faster checkpoints. Chandy Lamport still waits for all > >> >> barriers > >> >> > to > >> >> > > >>>> arrive naturally, logging on the way. If data processing is > >> slow, > >> >> > this > >> >> > > >>> can > >> >> > > >>>> still take quite a while. > >> >> > > >>>> > >> >> > > >>>> ==> I think both these points are strong reasons to not > change > >> >> the > >> >> > > >>>> mechanism away from "trigger sources" and start with > CL-style > >> >> > "trigger > >> >> > > >>> all". > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> *## Possible ways to combine Chandy Lamport and Unaligned > >> >> > Checkpoints* > >> >> > > >>>> > >> >> > > >>>> We can think about something like "take state snapshot on > >> first > >> >> > > barrier" > >> >> > > >>>> and then store buffers until the other barriers arrive. > Inside > >> >> the > >> >> > > >>> network > >> >> > > >>>> stack, barriers could still overtake and persist buffers. > >> >> > > >>>> The benefit would be less latency increase in the channels > >> which > >> >> > > already > >> >> > > >>>> have received barriers. > >> >> > > >>>> However, as mentioned before, not prioritizing the inputs > from > >> >> which > >> >> > > >>>> barriers are still missing can also have an adverse effect. > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> *## Concerning upgrades* > >> >> > > >>>> > >> >> > > >>>> I think it is a fair restriction to say that upgrades need > to > >> >> happen > >> >> > > on > >> >> > > >>>> aligned checkpoints. It is a rare enough operation. > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> *## Concerning re-scaling (changing parallelism)* > >> >> > > >>>> > >> >> > > >>>> We need to support that on unaligned checkpoints as well. > >> There > >> >> are > >> >> > > >>> several > >> >> > > >>>> feature proposals about automatic scaling, especially down > >> >> scaling > >> >> > in > >> >> > > >>> case > >> >> > > >>>> of missing resources. The last snapshot might be a regular > >> >> > > checkpoint, so > >> >> > > >>>> all checkpoints need to support rescaling. > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> *## Concerning end-to-end checkpoint duration and "trigger > >> >> sources" > >> >> > > >>> versus > >> >> > > >>>> "trigger all"* > >> >> > > >>>> > >> >> > > >>>> I think for the end-to-end checkpoint duration, an "overtake > >> >> > buffers" > >> >> > > >>>> approach yields faster checkpoints, as mentioned above > (Chandy > >> >> > Lamport > >> >> > > >>>> logging still needs to wait for barrier to flow). > >> >> > > >>>> > >> >> > > >>>> I don't see the benefit of a "trigger all tasks via RPC > >> >> > concurrently" > >> >> > > >>>> approach. Bear in mind that it is still a globally > coordinated > >> >> > > approach > >> >> > > >>> and > >> >> > > >>>> you need to wait for the global checkpoint to complete > before > >> >> > > committing > >> >> > > >>>> any side effects. > >> >> > > >>>> I believe that the checkpoint time is more determined by the > >> >> state > >> >> > > >>>> checkpoint writing, and the global coordination and metadata > >> >> commit, > >> >> > > than > >> >> > > >>>> by the difference in alignment time between "trigger from > >> source > >> >> and > >> >> > > jump > >> >> > > >>>> over buffers" versus "trigger all tasks concurrently". > >> >> > > >>>> > >> >> > > >>>> Trying to optimize a few tens of milliseconds out of the > >> network > >> >> > stack > >> >> > > >>>> sends (and changing the overall checkpointing approach > >> completely > >> >> > for > >> >> > > >>> that) > >> >> > > >>>> while staying with a globally coordinated checkpoint will > >> send us > >> >> > > down a > >> >> > > >>>> path to a dead end. > >> >> > > >>>> > >> >> > > >>>> To really bring task persistence latency down to 10s of > >> >> milliseconds > >> >> > > (so > >> >> > > >>> we > >> >> > > >>>> can frequently commit in sinks), we need to take an approach > >> >> without > >> >> > > any > >> >> > > >>>> global coordination. Tasks need to establish a persistent > >> >> recovery > >> >> > > point > >> >> > > >>>> individually and at their own discretion, only then can it > be > >> >> > frequent > >> >> > > >>>> enough. To get there, they would need to decouple themselves > >> from > >> >> > the > >> >> > > >>>> predecessor and successor tasks (via something like > persistent > >> >> > > channels). > >> >> > > >>>> This is a different discussion, though, somewhat orthogonal > to > >> >> this > >> >> > > one > >> >> > > >>>> here. > >> >> > > >>>> > >> >> > > >>>> Best, > >> >> > > >>>> Stephan > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski < > >> >> > pi...@ververica.com> > >> >> > > >>> wrote: > >> >> > > >>>> > >> >> > > >>>>> Hi again, > >> >> > > >>>>> > >> >> > > >>>>> Zhu Zhu let me think about this more. Maybe as Paris is > >> >> writing, we > >> >> > > do > >> >> > > >>> not > >> >> > > >>>>> need to block any channels at all, at least assuming credit > >> base > >> >> > flow > >> >> > > >>>>> control. Regarding what should happen with the following > >> >> checkpoint > >> >> > > is > >> >> > > >>>>> another question. Also, should we support concurrent > >> checkpoints > >> >> > and > >> >> > > >>>>> subsuming checkpoints as we do now? Maybe not… > >> >> > > >>>>> > >> >> > > >>>>> Paris > >> >> > > >>>>> > >> >> > > >>>>> Re > >> >> > > >>>>> I. 2. a) and b) - yes, this would have to be taken into an > >> >> account > >> >> > > >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint > >> time > >> >> > will > >> >> > > >>>>> probably be longer than it could be. It might affect > external > >> >> > > systems. > >> >> > > >>> For > >> >> > > >>>>> example Kafka, which automatically time outs lingering > >> >> > transactions, > >> >> > > and > >> >> > > >>>>> for us, the transaction time is equal to the time between > two > >> >> > > >>> checkpoints. > >> >> > > >>>>> > >> >> > > >>>>> II 1. - I’m confused. To make things straight. Flink is > >> >> currently > >> >> > > >>>>> snapshotting once it receives all of the checkpoint > barriers > >> >> from > >> >> > > all of > >> >> > > >>>>> the input channels and only then it broadcasts the > checkpoint > >> >> > barrier > >> >> > > >>> down > >> >> > > >>>>> the stream. And this is correct from exactly-once > >> perspective. > >> >> > > >>>>> > >> >> > > >>>>> As far as I understand, your proposal based on Chandy > Lamport > >> >> > > algorithm, > >> >> > > >>>>> is snapshotting the state of the operator on the first > >> >> checkpoint > >> >> > > >>> barrier, > >> >> > > >>>>> which also looks correct to me. > >> >> > > >>>>> > >> >> > > >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more > >> about > >> >> > > this. > >> >> > > >>>>> > >> >> > > >>>>> V. Yes, we still need aligned checkpoints, as they are > easier > >> >> for > >> >> > > state > >> >> > > >>>>> migration and upgrades. > >> >> > > >>>>> > >> >> > > >>>>> Piotrek > >> >> > > >>>>> > >> >> > > >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone < > >> >> seniorcarb...@gmail.com> > >> >> > > >>> wrote: > >> >> > > >>>>>> > >> >> > > >>>>>> Now I see a little more clearly what you have in mind. > >> Thanks > >> >> for > >> >> > > the > >> >> > > >>>>> explanation! > >> >> > > >>>>>> There are a few intermixed concepts here, some how to do > >> with > >> >> > > >>>>> correctness some with performance. > >> >> > > >>>>>> Before delving deeper I will just enumerate a few things > to > >> >> make > >> >> > > myself > >> >> > > >>>>> a little more helpful if I can. > >> >> > > >>>>>> > >> >> > > >>>>>> I. Initiation > >> >> > > >>>>>> ------------- > >> >> > > >>>>>> > >> >> > > >>>>>> 1. RPC to sources only is a less intrusive way to initiate > >> >> > snapshots > >> >> > > >>>>> since you utilize better pipeline parallelism (only a small > >> >> subset > >> >> > of > >> >> > > >>> tasks > >> >> > > >>>>> is running progressively the protocol at a time, if > >> >> snapshotting is > >> >> > > >>> async > >> >> > > >>>>> the overall overhead might not even be observable). > >> >> > > >>>>>> > >> >> > > >>>>>> 2. If we really want an RPC to all initiation take notice > of > >> >> the > >> >> > > >>>>> following implications: > >> >> > > >>>>>> > >> >> > > >>>>>> a. (correctness) RPC calls are not guaranteed to arrive > >> in > >> >> > every > >> >> > > >>>>> task before a marker from a preceding task. > >> >> > > >>>>>> > >> >> > > >>>>>> b. (correctness) Either the RPC call OR the first > >> arriving > >> >> > marker > >> >> > > >>>>> should initiate the algorithm. Whichever comes first. If > you > >> >> only > >> >> > do > >> >> > > it > >> >> > > >>> per > >> >> > > >>>>> RPC call then you capture a "late" state that includes side > >> >> effects > >> >> > > of > >> >> > > >>>>> already logged events. > >> >> > > >>>>>> > >> >> > > >>>>>> c. (performance) Lots of IO will be invoked at the same > >> >> time on > >> >> > > >>>>> the backend store from all tasks. This might lead to high > >> >> > congestion > >> >> > > in > >> >> > > >>>>> async snapshots. > >> >> > > >>>>>> > >> >> > > >>>>>> II. Capturing State First > >> >> > > >>>>>> ------------------------- > >> >> > > >>>>>> > >> >> > > >>>>>> 1. (correctness) Capturing state at the last marker sounds > >> >> > > incorrect to > >> >> > > >>>>> me (state contains side effects of already logged events > >> based > >> >> on > >> >> > the > >> >> > > >>>>> proposed scheme). This results into duplicate processing. > No? > >> >> > > >>>>>> > >> >> > > >>>>>> III. Channel Blocking / "Alignment" > >> >> > > >>>>>> ----------------------------------- > >> >> > > >>>>>> > >> >> > > >>>>>> 1. (performance?) What is the added benefit? We dont want > a > >> >> > > "complete" > >> >> > > >>>>> transactional snapshot, async snapshots are purely for > >> >> > > failure-recovery. > >> >> > > >>>>> Thus, I dont see why this needs to be imposed at the > expense > >> of > >> >> > > >>>>> performance/throughput. With the proposed scheme the whole > >> >> dataflow > >> >> > > >>> anyway > >> >> > > >>>>> enters snapshotting/logging mode so tasks more or less > >> snapshot > >> >> > > >>>>> concurrently. > >> >> > > >>>>>> > >> >> > > >>>>>> IV Marker Bypassing > >> >> > > >>>>>> ------------------- > >> >> > > >>>>>> > >> >> > > >>>>>> 1. (correctness) This leads to equivalent in-flight > >> snapshots > >> >> so > >> >> > > with > >> >> > > >>>>> some quick thinking correct. I will try to model this > later > >> and > >> >> > get > >> >> > > >>> back > >> >> > > >>>>> to you in case I find something wrong. > >> >> > > >>>>>> > >> >> > > >>>>>> 2. (performance) It also sounds like a meaningful > >> >> optimisation! I > >> >> > > like > >> >> > > >>>>> thinking of this as a push-based snapshot. i.e., the > >> producing > >> >> task > >> >> > > >>> somehow > >> >> > > >>>>> triggers forward a consumer/channel to capture its state. > By > >> >> > example > >> >> > > >>>>> consider T1 -> |marker t1| -> T2. > >> >> > > >>>>>> > >> >> > > >>>>>> V. Usage of "Async" Snapshots > >> >> > > >>>>>> --------------------- > >> >> > > >>>>>> > >> >> > > >>>>>> 1. Do you see this as a full replacement of "full" aligned > >> >> > > >>>>> snapshots/savepoints? In my view async shanpshots will be > >> needed > >> >> > from > >> >> > > >>> time > >> >> > > >>>>> to time but not as frequently. Yet, it seems like a valid > >> >> approach > >> >> > > >>> solely > >> >> > > >>>>> for failure-recovery on the same configuration. Here's why: > >> >> > > >>>>>> > >> >> > > >>>>>> a. With original snapshotting there is a strong duality > >> >> between > >> >> > > >>>>>> a stream input (offsets) and committed side effects > >> >> (internal > >> >> > > >>>>> states and external commits to transactional sinks). While > in > >> >> the > >> >> > > async > >> >> > > >>>>> version, there are uncommitted operations (inflight > records). > >> >> Thus, > >> >> > > you > >> >> > > >>>>> cannot use these snapshots for e.g., submitting sql queries > >> with > >> >> > > >>> snapshot > >> >> > > >>>>> isolation. Also, the original snapshotting gives a lot of > >> >> potential > >> >> > > for > >> >> > > >>>>> flink to make proper transactional commits externally. > >> >> > > >>>>>> > >> >> > > >>>>>> b. Reconfiguration is very tricky, you probably know > that > >> >> > better. > >> >> > > >>>>> Inflight channel state is no longer valid in a new > >> configuration > >> >> > > (i.e., > >> >> > > >>> new > >> >> > > >>>>> dataflow graph, new operators, updated operator logic, > >> different > >> >> > > >>> channels, > >> >> > > >>>>> different parallelism) > >> >> > > >>>>>> > >> >> > > >>>>>> 2. Async snapshots can also be potentially useful for > >> >> monitoring > >> >> > the > >> >> > > >>>>> general health of a dataflow since they can be analyzed by > >> the > >> >> task > >> >> > > >>> manager > >> >> > > >>>>> about the general performance of a job graph and spot > >> >> bottlenecks > >> >> > for > >> >> > > >>>>> example. > >> >> > > >>>>>> > >> >> > > >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski < > >> pi...@ververica.com > >> >> > > >> >> > > wrote: > >> >> > > >>>>>>> > >> >> > > >>>>>>> Hi, > >> >> > > >>>>>>> > >> >> > > >>>>>>> Thomas: > >> >> > > >>>>>>> There are no Jira tickets yet (or maybe there is > something > >> >> very > >> >> > old > >> >> > > >>>>> somewhere). First we want to discuss it, next present FLIP > >> and > >> >> at > >> >> > > last > >> >> > > >>>>> create tickets :) > >> >> > > >>>>>>> > >> >> > > >>>>>>>> if I understand correctly, then the proposal is to not > >> block > >> >> any > >> >> > > >>>>>>>> input channel at all, but only log data from the > >> >> backpressured > >> >> > > >>> channel > >> >> > > >>>>> (and > >> >> > > >>>>>>>> make it part of the snapshot) until the barrier arrives > >> >> > > >>>>>>> > >> >> > > >>>>>>> I would guess that it would be better to block the reads, > >> >> unless > >> >> > we > >> >> > > >>> can > >> >> > > >>>>> already process the records from the blocked channel… > >> >> > > >>>>>>> > >> >> > > >>>>>>> Paris: > >> >> > > >>>>>>> > >> >> > > >>>>>>> Thanks for the explanation Paris. I’m starting to > >> understand > >> >> this > >> >> > > more > >> >> > > >>>>> and I like the idea of snapshotting the state of an > operator > >> >> before > >> >> > > >>>>> receiving all of the checkpoint barriers - this would allow > >> more > >> >> > > things > >> >> > > >>> to > >> >> > > >>>>> happen at the same time instead of sequentially. As > Zhijiang > >> has > >> >> > > pointed > >> >> > > >>>>> out there are some things not considered in your proposal: > >> >> > overtaking > >> >> > > >>>>> output buffers, but maybe those things could be > incorporated > >> >> > > together. > >> >> > > >>>>>>> > >> >> > > >>>>>>> Another thing is that from the wiki description I > >> understood > >> >> that > >> >> > > the > >> >> > > >>>>> initial checkpointing is not initialised by any checkpoint > >> >> barrier, > >> >> > > but > >> >> > > >>> by > >> >> > > >>>>> an independent call/message from the Observer. I haven’t > >> played > >> >> > with > >> >> > > >>> this > >> >> > > >>>>> idea a lot, but I had some discussion with Nico and it > seems > >> >> that > >> >> > it > >> >> > > >>> might > >> >> > > >>>>> work: > >> >> > > >>>>>>> > >> >> > > >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all > tasks > >> >> > > >>>>>>> 2. Task (with two input channels l1 and l2) upon > receiving > >> RPC > >> >> > from > >> >> > > >>> 1., > >> >> > > >>>>> takes a snapshot of it's state and: > >> >> > > >>>>>>> a) broadcast checkpoint barrier down the stream to all > >> >> channels > >> >> > > (let’s > >> >> > > >>>>> ignore for a moment potential for this barrier to overtake > >> the > >> >> > buffer > >> >> > > >>>>> output data) > >> >> > > >>>>>>> b) for any input channel for which it hasn’t yet received > >> >> > > checkpoint > >> >> > > >>>>> barrier, the data are being added to the checkpoint > >> >> > > >>>>>>> c) once a channel (for example l1) receives a checkpoint > >> >> barrier, > >> >> > > the > >> >> > > >>>>> Task blocks reads from that channel (?) > >> >> > > >>>>>>> d) after all remaining channels (l2) receive checkpoint > >> >> barriers, > >> >> > > the > >> >> > > >>>>> Task first has to process the buffered data after that it > >> can > >> >> > > unblock > >> >> > > >>> the > >> >> > > >>>>> reads from the channels > >> >> > > >>>>>>> > >> >> > > >>>>>>> Checkpoint barriers do not cascade/flow through different > >> >> tasks > >> >> > > here. > >> >> > > >>>>> Checkpoint barrier emitted from Task1, reaches only the > >> >> immediate > >> >> > > >>>>> downstream Tasks. Thanks to this setup, total checkpointing > >> >> time is > >> >> > > not > >> >> > > >>> sum > >> >> > > >>>>> of checkpointing times of all Tasks one by one, but more or > >> less > >> >> > max > >> >> > > of > >> >> > > >>> the > >> >> > > >>>>> slowest Tasks. Right? > >> >> > > >>>>>>> > >> >> > > >>>>>>> Couple of intriguing thoughts are: > >> >> > > >>>>>>> 3. checkpoint barriers overtaking the output buffers > >> >> > > >>>>>>> 4. can we keep processing some data (in order to not > waste > >> CPU > >> >> > > cycles) > >> >> > > >>>>> after we have taking the snapshot of the Task. I think we > >> could. > >> >> > > >>>>>>> > >> >> > > >>>>>>> Piotrek > >> >> > > >>>>>>> > >> >> > > >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> > >> >> wrote: > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> Great discussion! I'm excited that this is already under > >> >> > > >>>>> consideration! Are > >> >> > > >>>>>>>> there any JIRAs or other traces of discussion to follow? > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> Paris, if I understand correctly, then the proposal is > to > >> not > >> >> > > block > >> >> > > >>> any > >> >> > > >>>>>>>> input channel at all, but only log data from the > >> >> backpressured > >> >> > > >>> channel > >> >> > > >>>>> (and > >> >> > > >>>>>>>> make it part of the snapshot) until the barrier arrives? > >> >> This is > >> >> > > >>>>>>>> intriguing. But probably there is also a benefit of to > not > >> >> > > continue > >> >> > > >>>>> reading > >> >> > > >>>>>>>> I1 since that could speed up retrieval from I2. Also, if > >> the > >> >> > user > >> >> > > >>> code > >> >> > > >>>>> is > >> >> > > >>>>>>>> the cause of backpressure, this would avoid pumping more > >> data > >> >> > into > >> >> > > >>> the > >> >> > > >>>>>>>> process function. > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> Thanks, > >> >> > > >>>>>>>> Thomas > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> > >> >> > > >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang < > >> >> > > wangzhijiang...@aliyun.com > >> >> > > >>>>> .invalid> > >> >> > > >>>>>>>> wrote: > >> >> > > >>>>>>>> > >> >> > > >>>>>>>>> Hi Paris, > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> Thanks for the detailed sharing. And I think it is very > >> >> similar > >> >> > > with > >> >> > > >>>>> the > >> >> > > >>>>>>>>> way of overtaking we proposed before. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> There are some tiny difference: > >> >> > > >>>>>>>>> The way of overtaking might need to snapshot all the > >> >> > input/output > >> >> > > >>>>> queues. > >> >> > > >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input > >> >> channels > >> >> > > >>> after > >> >> > > >>>>> the > >> >> > > >>>>>>>>> first barrier arrives, which might reduce the state > sizea > >> >> bit. > >> >> > > But > >> >> > > >>>>> normally > >> >> > > >>>>>>>>> there should be less buffers for the first input > channel > >> >> with > >> >> > > >>> barrier. > >> >> > > >>>>>>>>> The output barrier still follows with regular data > >> stream in > >> >> > > Chandy > >> >> > > >>>>>>>>> Lamport, the same way as current flink. For overtaking > >> way, > >> >> we > >> >> > > need > >> >> > > >>>>> to pay > >> >> > > >>>>>>>>> extra efforts to make barrier transport firstly before > >> >> outque > >> >> > > queue > >> >> > > >>> on > >> >> > > >>>>>>>>> upstream side, and change the way of barrier alignment > >> >> based on > >> >> > > >>>>> receiving > >> >> > > >>>>>>>>> instead of current reading on downstream side. > >> >> > > >>>>>>>>> In the backpressure caused by data skew, the first > >> barrier > >> >> in > >> >> > > almost > >> >> > > >>>>> empty > >> >> > > >>>>>>>>> input channel should arrive much eariler than the last > >> heavy > >> >> > load > >> >> > > >>>>> input > >> >> > > >>>>>>>>> channel, so the Chandy Lamport could benefit well. But > >> for > >> >> the > >> >> > > case > >> >> > > >>>>> of all > >> >> > > >>>>>>>>> balanced heavy load input channels, I mean the first > >> arrived > >> >> > > barrier > >> >> > > >>>>> might > >> >> > > >>>>>>>>> still take much time, then the overtaking way could > still > >> >> fit > >> >> > > well > >> >> > > >>> to > >> >> > > >>>>> speed > >> >> > > >>>>>>>>> up checkpoint. > >> >> > > >>>>>>>>> Anyway, your proposed suggestion is helpful on my side, > >> >> > > especially > >> >> > > >>>>>>>>> considering some implementation details . > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> Best, > >> >> > > >>>>>>>>> Zhijiang > >> >> > > >>>>>>>>> > >> >> > > > ------------------------------------------------------------------ > >> >> > > >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com> > >> >> > > >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03 > >> >> > > >>>>>>>>> To:dev <dev@flink.apache.org> > >> >> > > >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com> > >> >> > > >>>>>>>>> Subject:Re: Checkpointing under backpressure > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> yes! It’s quite similar I think. Though mind that the > >> >> devil is > >> >> > > in > >> >> > > >>> the > >> >> > > >>>>>>>>> details, i.e., the temporal order actions are taken. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> To clarify, let us say you have a task T with two input > >> >> > channels > >> >> > > I1 > >> >> > > >>>>> and I2. > >> >> > > >>>>>>>>> The Chandy Lamport execution flow is the following: > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> 1) T receives barrier from I1 and... > >> >> > > >>>>>>>>> 2) ...the following three actions happen atomically > >> >> > > >>>>>>>>> I ) T snapshots its state T* > >> >> > > >>>>>>>>> II) T forwards marker to its outputs > >> >> > > >>>>>>>>> III) T starts logging all events of I2 (only) into a > >> buffer > >> >> M* > >> >> > > >>>>>>>>> - Also notice here that T does NOT block I1 as it does > in > >> >> > aligned > >> >> > > >>>>>>>>> snapshots - > >> >> > > >>>>>>>>> 3) Eventually T receives barrier from I2 and stops > >> recording > >> >> > > events. > >> >> > > >>>>> Its > >> >> > > >>>>>>>>> asynchronously captured snapshot is now complete: > >> {T*,M*}. > >> >> > > >>>>>>>>> Upon recovery all messages of M* should be replayed in > >> FIFO > >> >> > > order. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> With this approach alignment does not create a deadlock > >> >> > situation > >> >> > > >>>>> since > >> >> > > >>>>>>>>> anyway 2.II happens asynchronously and messages can be > >> >> logged > >> >> > as > >> >> > > >>> well > >> >> > > >>>>>>>>> asynchronously during the process of the snapshot. If > >> there > >> >> is > >> >> > > >>>>>>>>> back-pressure in a pipeline the cause is most probably > >> not > >> >> this > >> >> > > >>>>> algorithm. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> Back to your observation, the answer : yes and no. In > >> your > >> >> > > network > >> >> > > >>>>> model, > >> >> > > >>>>>>>>> I can see the logic of “logging” and “committing” a > final > >> >> > > snapshot > >> >> > > >>>>> being > >> >> > > >>>>>>>>> provided by the channel implementation. However, do > mind > >> >> that > >> >> > the > >> >> > > >>>>> first > >> >> > > >>>>>>>>> barrier always needs to go “all the way” to initiate > the > >> >> Chandy > >> >> > > >>>>> Lamport > >> >> > > >>>>>>>>> algorithm logic. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> The above flow has been proven using temporal logic in > my > >> >> phd > >> >> > > thesis > >> >> > > >>>>> in > >> >> > > >>>>>>>>> case you are interested about the proof. > >> >> > > >>>>>>>>> I hope this helps a little clarifying things. Let me > >> know if > >> >> > > there > >> >> > > >>> is > >> >> > > >>>>> any > >> >> > > >>>>>>>>> confusing point to disambiguate. I would be more than > >> happy > >> >> to > >> >> > > help > >> >> > > >>>>> if I > >> >> > > >>>>>>>>> can. > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> Paris > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski < > >> >> pi...@ververica.com > >> >> > > > >> >> > > >>>>> wrote: > >> >> > > >>>>>>>>>> > >> >> > > >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport > >> >> snapshots > >> >> > > don’t > >> >> > > >>>>> you > >> >> > > >>>>>>>>> still have to wait for the “checkpoint barrier” to > >> arrive in > >> >> > > order > >> >> > > >>> to > >> >> > > >>>>> know > >> >> > > >>>>>>>>> when have you already received all possible messages > from > >> >> the > >> >> > > >>> upstream > >> >> > > >>>>>>>>> tasks/operators? So instead of processing the “in > flight” > >> >> > > messages > >> >> > > >>>>> (as the > >> >> > > >>>>>>>>> Flink is doing currently), you are sending them to an > >> >> > “observer”? > >> >> > > >>>>>>>>>> > >> >> > > >>>>>>>>>> In that case, that’s sounds similar to “checkpoint > >> barriers > >> >> > > >>>>> overtaking > >> >> > > >>>>>>>>> in flight records” (aka unaligned checkpoints). Just > for > >> us, > >> >> > the > >> >> > > >>>>> observer > >> >> > > >>>>>>>>> is a snapshot state. > >> >> > > >>>>>>>>>> > >> >> > > >>>>>>>>>> Piotrek > >> >> > > >>>>>>>>>> > >> >> > > >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone < > >> >> > > seniorcarb...@gmail.com> > >> >> > > >>>>>>>>> wrote: > >> >> > > >>>>>>>>>>> > >> >> > > >>>>>>>>>>> Interesting problem! Thanks for bringing it up > Thomas. > >> >> > > >>>>>>>>>>> > >> >> > > >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe > >> >> Chandy-Lamport > >> >> > > >>>>> snapshots > >> >> > > >>>>>>>>> [1] would help out solve this problem more elegantly > >> without > >> >> > > >>>>> sacrificing > >> >> > > >>>>>>>>> correctness. > >> >> > > >>>>>>>>>>> - They do not need alignment, only (async) logging > for > >> >> > > in-flight > >> >> > > >>>>>>>>> records between the time the first barrier is processed > >> >> until > >> >> > the > >> >> > > >>> last > >> >> > > >>>>>>>>> barrier arrives in a task. > >> >> > > >>>>>>>>>>> - They work fine for failure recovery as long as > logged > >> >> > records > >> >> > > >>> are > >> >> > > >>>>>>>>> replayed on startup. > >> >> > > >>>>>>>>>>> > >> >> > > >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still > >> >> > necessary > >> >> > > >>> for > >> >> > > >>>>>>>>> transactional sink commits + any sort of > reconfiguration > >> >> (e.g., > >> >> > > >>>>> rescaling, > >> >> > > >>>>>>>>> updating the logic of operators to evolve an > application > >> >> etc.). > >> >> > > >>>>>>>>>>> > >> >> > > >>>>>>>>>>> I don’t completely understand the “overtaking” > approach > >> >> but > >> >> > if > >> >> > > you > >> >> > > >>>>> have > >> >> > > >>>>>>>>> a concrete definition I would be happy to check it out > >> and > >> >> help > >> >> > > if I > >> >> > > >>>>> can! > >> >> > > >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by > >> logging > >> >> > > things > >> >> > > >>> in > >> >> > > >>>>>>>>> pending channels in a task snapshot before the barrier > >> >> arrives. > >> >> > > >>>>>>>>>>> > >> >> > > >>>>>>>>>>> -Paris > >> >> > > >>>>>>>>>>> > >> >> > > >>>>>>>>>>> [1] > >> >> > > >>> > https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm > >> >> > > >>>>> < > >> >> > > >>>>>>>>> > >> >> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm > >> >> > > > >> >> > > >>>>>>>>>>> > >> >> > > >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski < > >> >> > pi...@ververica.com > >> >> > > > > >> >> > > >>>>> wrote: > >> >> > > >>>>>>>>>>>> > >> >> > > >>>>>>>>>>>> Hi Thomas, > >> >> > > >>>>>>>>>>>> > >> >> > > >>>>>>>>>>>> As Zhijiang has responded, we are now in the process > >> of > >> >> > > >>> discussing > >> >> > > >>>>> how > >> >> > > >>>>>>>>> to address this issue and one of the solution that we > are > >> >> > > discussing > >> >> > > >>>>> is > >> >> > > >>>>>>>>> exactly what you are proposing: checkpoint barriers > >> >> overtaking > >> >> > > the > >> >> > > >>> in > >> >> > > >>>>>>>>> flight data and make the in flight data part of the > >> >> checkpoint. > >> >> > > >>>>>>>>>>>> > >> >> > > >>>>>>>>>>>> If everything works well, we will be able to present > >> >> result > >> >> > of > >> >> > > >>> our > >> >> > > >>>>>>>>> discussions on the dev mailing list soon. > >> >> > > >>>>>>>>>>>> > >> >> > > >>>>>>>>>>>> Piotrek > >> >> > > >>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang < > >> >> > > wangzhijiang...@aliyun.com > >> >> > > >>>>> .INVALID> > >> >> > > >>>>>>>>> wrote: > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> Hi Thomas, > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> Thanks for proposing this concern. The barrier > >> alignment > >> >> > > takes > >> >> > > >>>>> long > >> >> > > >>>>>>>>> time in backpressure case which could cause several > >> >> problems: > >> >> > > >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned. > >> >> > > >>>>>>>>>>>>> 2. The recovery cost is high once failover, because > >> much > >> >> > data > >> >> > > >>>>> needs > >> >> > > >>>>>>>>> to be replayed. > >> >> > > >>>>>>>>>>>>> 3. The delay for commit-based sink is high in > >> >> exactly-once. > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> For credit-based flow control from release-1.5, the > >> >> amount > >> >> > of > >> >> > > >>>>>>>>> in-flighting buffers before barrier alignment is > reduced, > >> >> so we > >> >> > > >>> could > >> >> > > >>>>> get a > >> >> > > >>>>>>>>> bit > >> >> > > >>>>>>>>>>>>> benefits from speeding checkpoint aspect. > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the > >> channels > >> >> > which > >> >> > > >>>>> already > >> >> > > >>>>>>>>> received the barrier in practice. But actually we ever > >> did > >> >> the > >> >> > > >>>>> similar thing > >> >> > > >>>>>>>>>>>>> to speed barrier alighment before. I am not quite > >> sure > >> >> that > >> >> > > >>>>>>>>> release-1.8 covers this feature. There were some > relevant > >> >> > > >>> discussions > >> >> > > >>>>> under > >> >> > > >>>>>>>>> jira [1]. > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> For release-1.10, the community is now discussing > the > >> >> > > feature of > >> >> > > >>>>>>>>> unaligned checkpoint which is mainly for resolving > above > >> >> > > concerns. > >> >> > > >>> The > >> >> > > >>>>>>>>> basic idea > >> >> > > >>>>>>>>>>>>> is to make barrier overtakes the output/input > buffer > >> >> queue > >> >> > to > >> >> > > >>>>> speed > >> >> > > >>>>>>>>> alignment, and snapshot the input/output buffers as > part > >> of > >> >> > > >>> checkpoint > >> >> > > >>>>>>>>> state. The > >> >> > > >>>>>>>>>>>>> details have not confirmed yet and is still under > >> >> > discussion. > >> >> > > >>>>> Wish we > >> >> > > >>>>>>>>> could make some improvments for the release-1.10. > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> [1] > https://issues.apache.org/jira/browse/FLINK-8523 > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> Best, > >> >> > > >>>>>>>>>>>>> Zhijiang > >> >> > > >>>>>>>>>>>>> > >> >> > > >>> > >> ------------------------------------------------------------------ > >> >> > > >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org> > >> >> > > >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38 > >> >> > > >>>>>>>>>>>>> To:dev <dev@flink.apache.org> > >> >> > > >>>>>>>>>>>>> Subject:Checkpointing under backpressure > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> Hi, > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> One of the major operational difficulties we > observe > >> >> with > >> >> > > Flink > >> >> > > >>>>> are > >> >> > > >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking > >> for > >> >> > both > >> >> > > >>>>>>>>> confirmation > >> >> > > >>>>>>>>>>>>> of my understanding of the current behavior as well > >> as > >> >> > > pointers > >> >> > > >>>>> for > >> >> > > >>>>>>>>> future > >> >> > > >>>>>>>>>>>>> improvement work: > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> Prior to introduction of credit based flow control > in > >> >> the > >> >> > > >>> network > >> >> > > >>>>>>>>> stack [1] > >> >> > > >>>>>>>>>>>>> [2], checkpoint barriers would back up with the > data > >> for > >> >> > all > >> >> > > >>>>> logical > >> >> > > >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, > >> the > >> >> > > buffers > >> >> > > >>> are > >> >> > > >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are > >> only > >> >> > held > >> >> > > >>>>> back for > >> >> > > >>>>>>>>>>>>> channels that have backpressure, while others can > >> >> continue > >> >> > > >>>>> processing > >> >> > > >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot > >> >> > "overtake > >> >> > > >>>>> data", > >> >> > > >>>>>>>>>>>>> therefore checkpoint alignment remains affected for > >> the > >> >> > > channel > >> >> > > >>>>> with > >> >> > > >>>>>>>>>>>>> backpressure, with the potential for slow > >> checkpointing > >> >> and > >> >> > > >>>>> timeouts. > >> >> > > >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the > >> >> maximum > >> >> > > >>>>> in-transit > >> >> > > >>>>>>>>>>>>> buffers per channel, resulting in an improvement > >> >> compared > >> >> > to > >> >> > > >>>>> previous > >> >> > > >>>>>>>>>>>>> versions of Flink. Also, the backpressure based > >> >> checkpoint > >> >> > > >>>>> alignment > >> >> > > >>>>>>>>> can > >> >> > > >>>>>>>>>>>>> help the barrier advance faster on the receiver > side > >> (by > >> >> > > >>>>> suspending > >> >> > > >>>>>>>>>>>>> channels that have already delivered the barrier). > Is > >> >> that > >> >> > > >>>>> accurate > >> >> > > >>>>>>>>> as of > >> >> > > >>>>>>>>>>>>> Flink 1.8? > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> What appears to be missing to completely unblock > >> >> > > checkpointing > >> >> > > >>> is > >> >> > > >>>>> a > >> >> > > >>>>>>>>>>>>> mechanism for checkpoints to overtake the data. > That > >> >> would > >> >> > > help > >> >> > > >>> in > >> >> > > >>>>>>>>>>>>> situations where the processing itself is the > >> bottleneck > >> >> > and > >> >> > > >>>>>>>>> prioritization > >> >> > > >>>>>>>>>>>>> in the network stack alone cannot address the > barrier > >> >> > delay. > >> >> > > Was > >> >> > > >>>>>>>>> there any > >> >> > > >>>>>>>>>>>>> related discussion? One possible solution would be > to > >> >> drain > >> >> > > >>>>> incoming > >> >> > > >>>>>>>>> data > >> >> > > >>>>>>>>>>>>> till the barrier and make it part of the checkpoint > >> >> instead > >> >> > > of > >> >> > > >>>>>>>>> processing > >> >> > > >>>>>>>>>>>>> it. This is somewhat related to asynchronous > >> processing, > >> >> > but > >> >> > > I'm > >> >> > > >>>>>>>>> thinking > >> >> > > >>>>>>>>>>>>> more of a solution that is automated in the Flink > >> >> runtime > >> >> > for > >> >> > > >>> the > >> >> > > >>>>>>>>>>>>> backpressure scenario only. > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> Thanks, > >> >> > > >>>>>>>>>>>>> Thomas > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>>> [1] > >> >> > > >>> https://flink.apache.org/2019/06/05/flink-network-stack.html > >> >> > > >>>>>>>>>>>>> [2] > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>> > >> >> > > >>>>> > >> >> > > >>> > >> >> > > > >> >> > > >> >> > >> > https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn > >> >> > > >>>>>>>>>>>>> > >> >> > > >>>>>>>>>>>> > >> >> > > >>>>>>>>>>> > >> >> > > >>>>>>>>>> > >> >> > > >>>>>>>>> > >> >> > > >>>>>>>>> > >> >> > > >>>>>>> > >> >> > > >>>>>> > >> >> > > >>>>> > >> >> > > >>>>> > >> >> > > >>> > >> >> > > >>> > >> >> > > > > >> >> > > > >> >> > > > >> >> > > >> >> > > >> >> > >> >> > >> >> > >> > > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >