Quick note: The current implementation is

Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part)

On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski <pi...@ververica.com> wrote:

> > Thanks for the great ideas so far.
>
> +1
>
> Regarding other things raised, I mostly agree with Stephan.
>
> I like the idea of simultaneously starting the checkpoint everywhere via
> RPC call (especially in cases where Tasks are busy doing some synchronous
> operations for example for tens of milliseconds. In that case every network
> exchange adds tens of milliseconds of delay in propagating the checkpoint).
> However I agree that this might be a premature optimisation assuming the
> current state of our code (we already have checkpoint barriers).
>
> However I like the idea of switching from:
>
> 1. A -> S -> F (Align -> snapshot -> forward markers)
>
> To
>
> 2. S -> F -> L (Snapshot -> forward markers -> log pending channels)
>
> Or even to
>
> 6. F -> S -> L (Forward markers -> snapshot -> log pending channels)
>
> It feels to me like this would decouple propagation of checkpoints from
> costs of synchronous snapshots and waiting for all of the checkpoint
> barriers to arrive (even if they will overtake in-flight records, this
> might take some time).
>
> > What I like about the Chandy Lamport approach (2.) initiated from
> sources is that:
> >       - Snapshotting imposes no modification to normal processing.
>
> Yes, I agree that would be nice. Currently, during the alignment and
> blocking of the input channels, we might be wasting CPU cycles of up stream
> tasks. If we succeed in designing new checkpointing mechanism to not
> disrupt/block regular data processing (% the extra IO cost for logging the
> in-flight records), that would be a huge improvement.
>
> Piotrek
>
> > On 14 Aug 2019, at 14:56, Paris Carbone <seniorcarb...@gmail.com> wrote:
> >
> > Sure I see. In cases when no periodic aligned snapshots are employed
> this is the only option.
> >
> > Two things that were not highlighted enough so far on the proposed
> protocol (included my mails):
> >       - The Recovery/Reconfiguration strategy should strictly prioritise
> processing logged events before entering normal task input operation.
> Otherwise causality can be violated. This also means dataflow recovery will
> be expected to be slower to the one employed on an aligned snapshot.
> >       - Same as with state capture, markers should be forwarded upon
> first marker received on input. No later than that. Otherwise we have
> duplicate side effects.
> >
> > Thanks for the great ideas so far.
> >
> > Paris
> >
> >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org> wrote:
> >>
> >> Scaling with unaligned checkpoints might be a necessity.
> >>
> >> Let's assume the job failed due to a lost TaskManager, but no new
> >> TaskManager becomes available.
> >> In that case we need to scale down based on the latest complete
> checkpoint,
> >> because we cannot produce a new checkpoint.
> >>
> >>
> >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone <seniorcarb...@gmail.com>
> >> wrote:
> >>
> >>> +1 I think we are on the same page Stephan.
> >>>
> >>> Rescaling on unaligned checkpoint sounds challenging and a bit
> >>> unnecessary. No?
> >>> Why not sticking to aligned snapshots for live
> reconfiguration/rescaling?
> >>> It’s a pretty rare operation and it would simplify things by a lot.
> >>> Everything can be “staged” upon alignment including replacing channels
> and
> >>> tasks.
> >>>
> >>> -Paris
> >>>
> >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote:
> >>>>
> >>>> Hi all!
> >>>>
> >>>> Yes, the first proposal of "unaligend checkpoints" (probably two years
> >>> back
> >>>> now) drew a major inspiration from Chandy Lamport, as did actually the
> >>>> original checkpointing algorithm.
> >>>>
> >>>> "Logging data between first and last barrier" versus "barrier jumping
> >>> over
> >>>> buffer and storing those buffers" is pretty close same.
> >>>> However, there are a few nice benefits of the proposal of unaligned
> >>>> checkpoints over Chandy-Lamport.
> >>>>
> >>>> *## Benefits of Unaligned Checkpoints*
> >>>>
> >>>> (1) It is very similar to the original algorithm (can be seen an an
> >>>> optional feature purely in the network stack) and thus can share
> lot's of
> >>>> code paths.
> >>>>
> >>>> (2) Less data stored. If we make the "jump over buffers" part timeout
> >>> based
> >>>> (for example barrier overtakes buffers if not flushed within 10ms)
> then
> >>>> checkpoints are in the common case of flowing pipelines aligned
> without
> >>>> in-flight data. Only back pressured cases store some in-flight data,
> >>> which
> >>>> means we don't regress in the common case and only fix the back
> pressure
> >>>> case.
> >>>>
> >>>> (3) Faster checkpoints. Chandy Lamport still waits for all barriers to
> >>>> arrive naturally, logging on the way. If data processing is slow, this
> >>> can
> >>>> still take quite a while.
> >>>>
> >>>> ==> I think both these points are strong reasons to not change the
> >>>> mechanism away from "trigger sources" and start with CL-style "trigger
> >>> all".
> >>>>
> >>>>
> >>>> *## Possible ways to combine Chandy Lamport and Unaligned Checkpoints*
> >>>>
> >>>> We can think about something like "take state snapshot on first
> barrier"
> >>>> and then store buffers until the other barriers arrive. Inside the
> >>> network
> >>>> stack, barriers could still overtake and persist buffers.
> >>>> The benefit would be less latency increase in the channels which
> already
> >>>> have received barriers.
> >>>> However, as mentioned before, not prioritizing the inputs from which
> >>>> barriers are still missing can also have an adverse effect.
> >>>>
> >>>>
> >>>> *## Concerning upgrades*
> >>>>
> >>>> I think it is a fair restriction to say that upgrades need to happen
> on
> >>>> aligned checkpoints. It is a rare enough operation.
> >>>>
> >>>>
> >>>> *## Concerning re-scaling (changing parallelism)*
> >>>>
> >>>> We need to support that on unaligned checkpoints as well. There are
> >>> several
> >>>> feature proposals about automatic scaling, especially down scaling in
> >>> case
> >>>> of missing resources. The last snapshot might be a regular
> checkpoint, so
> >>>> all checkpoints need to support rescaling.
> >>>>
> >>>>
> >>>> *## Concerning end-to-end checkpoint duration and "trigger sources"
> >>> versus
> >>>> "trigger all"*
> >>>>
> >>>> I think for the end-to-end checkpoint duration, an "overtake buffers"
> >>>> approach yields faster checkpoints, as mentioned above (Chandy Lamport
> >>>> logging still needs to wait for barrier to flow).
> >>>>
> >>>> I don't see the benefit of a "trigger all tasks via RPC concurrently"
> >>>> approach. Bear in mind that it is still a globally coordinated
> approach
> >>> and
> >>>> you need to wait for the global checkpoint to complete before
> committing
> >>>> any side effects.
> >>>> I believe that the checkpoint time is more determined by the state
> >>>> checkpoint writing, and the global coordination and metadata commit,
> than
> >>>> by the difference in alignment time between "trigger from source and
> jump
> >>>> over buffers" versus "trigger all tasks concurrently".
> >>>>
> >>>> Trying to optimize a few tens of milliseconds out of the network stack
> >>>> sends (and changing the overall checkpointing approach completely for
> >>> that)
> >>>> while staying with a globally coordinated checkpoint will send us
> down a
> >>>> path to a dead end.
> >>>>
> >>>> To really bring task persistence latency down to 10s of milliseconds
> (so
> >>> we
> >>>> can frequently commit in sinks), we need to take an approach without
> any
> >>>> global coordination. Tasks need to establish a persistent recovery
> point
> >>>> individually and at their own discretion, only then can it be frequent
> >>>> enough. To get there, they would need to decouple themselves from the
> >>>> predecessor and successor tasks (via something like persistent
> channels).
> >>>> This is a different discussion, though, somewhat orthogonal to this
> one
> >>>> here.
> >>>>
> >>>> Best,
> >>>> Stephan
> >>>>
> >>>>
> >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <pi...@ververica.com>
> >>> wrote:
> >>>>
> >>>>> Hi again,
> >>>>>
> >>>>> Zhu Zhu let me think about this more. Maybe as Paris is writing, we
> do
> >>> not
> >>>>> need to block any channels at all, at least assuming credit base flow
> >>>>> control. Regarding what should happen with the following checkpoint
> is
> >>>>> another question. Also, should we support concurrent checkpoints and
> >>>>> subsuming checkpoints as we do now? Maybe not…
> >>>>>
> >>>>> Paris
> >>>>>
> >>>>> Re
> >>>>> I. 2. a) and b) - yes, this would have to be taken into an account
> >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint time will
> >>>>> probably be longer than it could be. It might affect external
> systems.
> >>> For
> >>>>> example Kafka, which automatically time outs lingering transactions,
> and
> >>>>> for us, the transaction time is equal to the time between two
> >>> checkpoints.
> >>>>>
> >>>>> II 1. - I’m confused. To make things straight. Flink is currently
> >>>>> snapshotting once it receives all of the checkpoint barriers from
> all of
> >>>>> the input channels and only then it broadcasts the checkpoint barrier
> >>> down
> >>>>> the stream. And this is correct from exactly-once perspective.
> >>>>>
> >>>>> As far as I understand, your proposal based on Chandy Lamport
> algorithm,
> >>>>> is snapshotting the state of the operator on the first checkpoint
> >>> barrier,
> >>>>> which also looks correct to me.
> >>>>>
> >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more about
> this.
> >>>>>
> >>>>> V. Yes, we still need aligned checkpoints, as they are easier for
> state
> >>>>> migration and upgrades.
> >>>>>
> >>>>> Piotrek
> >>>>>
> >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone <seniorcarb...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>> Now I see a little more clearly what you have in mind. Thanks for
> the
> >>>>> explanation!
> >>>>>> There are a few intermixed concepts here, some how to do with
> >>>>> correctness some with performance.
> >>>>>> Before delving deeper I will just enumerate a few things to make
> myself
> >>>>> a little more helpful if I can.
> >>>>>>
> >>>>>> I. Initiation
> >>>>>> -------------
> >>>>>>
> >>>>>> 1. RPC to sources only is a less intrusive way to initiate snapshots
> >>>>> since you utilize better pipeline parallelism (only a small subset of
> >>> tasks
> >>>>> is running progressively the protocol at a time, if snapshotting is
> >>> async
> >>>>> the overall overhead might not even be observable).
> >>>>>>
> >>>>>> 2. If we really want an RPC to all initiation take notice of the
> >>>>> following implications:
> >>>>>>
> >>>>>>    a. (correctness) RPC calls are not guaranteed to arrive in every
> >>>>> task before a marker from a preceding task.
> >>>>>>
> >>>>>>    b. (correctness) Either the RPC call OR the first arriving marker
> >>>>> should initiate the algorithm. Whichever comes first. If you only do
> it
> >>> per
> >>>>> RPC call then you capture a "late" state that includes side effects
> of
> >>>>> already logged events.
> >>>>>>
> >>>>>>    c. (performance) Lots of IO will be invoked at the same time on
> >>>>> the backend store from all tasks. This might lead to high congestion
> in
> >>>>> async snapshots.
> >>>>>>
> >>>>>> II. Capturing State First
> >>>>>> -------------------------
> >>>>>>
> >>>>>> 1. (correctness) Capturing state at the last marker sounds
> incorrect to
> >>>>> me (state contains side effects of already logged events based on the
> >>>>> proposed scheme). This results into duplicate processing. No?
> >>>>>>
> >>>>>> III. Channel Blocking / "Alignment"
> >>>>>> -----------------------------------
> >>>>>>
> >>>>>> 1. (performance?) What is the added benefit? We dont want a
> "complete"
> >>>>> transactional snapshot, async snapshots are purely for
> failure-recovery.
> >>>>> Thus, I dont see why this needs to be imposed at the expense of
> >>>>> performance/throughput. With the proposed scheme the whole dataflow
> >>> anyway
> >>>>> enters snapshotting/logging mode so tasks more or less snapshot
> >>>>> concurrently.
> >>>>>>
> >>>>>> IV Marker Bypassing
> >>>>>> -------------------
> >>>>>>
> >>>>>> 1. (correctness) This leads to equivalent in-flight snapshots so
> with
> >>>>> some quick thinking  correct. I will try to model this later and get
> >>> back
> >>>>> to you in case I find something wrong.
> >>>>>>
> >>>>>> 2. (performance) It also sounds like a meaningful optimisation! I
> like
> >>>>> thinking of this as a push-based snapshot. i.e., the producing task
> >>> somehow
> >>>>> triggers forward a consumer/channel to capture its state. By example
> >>>>> consider T1 -> |marker t1| -> T2.
> >>>>>>
> >>>>>> V. Usage of "Async" Snapshots
> >>>>>> ---------------------
> >>>>>>
> >>>>>> 1. Do you see this as a full replacement of "full" aligned
> >>>>> snapshots/savepoints? In my view async shanpshots will be needed from
> >>> time
> >>>>> to time but not as frequently. Yet, it seems like a valid approach
> >>> solely
> >>>>> for failure-recovery on the same configuration. Here's why:
> >>>>>>
> >>>>>>    a. With original snapshotting there is a strong duality between
> >>>>>>    a stream input (offsets) and committed side effects (internal
> >>>>> states and external commits to transactional sinks). While in the
> async
> >>>>> version, there are uncommitted operations (inflight records). Thus,
> you
> >>>>> cannot use these snapshots for e.g., submitting sql queries with
> >>> snapshot
> >>>>> isolation. Also, the original snapshotting gives a lot of potential
> for
> >>>>> flink to make proper transactional commits externally.
> >>>>>>
> >>>>>>    b. Reconfiguration is very tricky, you probably know that better.
> >>>>> Inflight channel state is no longer valid in a new configuration
> (i.e.,
> >>> new
> >>>>> dataflow graph, new operators, updated operator logic, different
> >>> channels,
> >>>>> different parallelism)
> >>>>>>
> >>>>>> 2. Async snapshots can also be potentially useful for monitoring the
> >>>>> general health of a dataflow since they can be analyzed by the task
> >>> manager
> >>>>> about the general performance of a job graph and spot bottlenecks for
> >>>>> example.
> >>>>>>
> >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com>
> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Thomas:
> >>>>>>> There are no Jira tickets yet (or maybe there is something very old
> >>>>> somewhere). First we want to discuss it, next present FLIP and at
> last
> >>>>> create tickets :)
> >>>>>>>
> >>>>>>>> if I understand correctly, then the proposal is to not block any
> >>>>>>>> input channel at all, but only log data from the backpressured
> >>> channel
> >>>>> (and
> >>>>>>>> make it part of the snapshot) until the barrier arrives
> >>>>>>>
> >>>>>>> I would guess that it would be better to block the reads, unless we
> >>> can
> >>>>> already process the records from the blocked channel…
> >>>>>>>
> >>>>>>> Paris:
> >>>>>>>
> >>>>>>> Thanks for the explanation Paris. I’m starting to understand this
> more
> >>>>> and I like the idea of snapshotting the state of an operator before
> >>>>> receiving all of the checkpoint barriers - this would allow more
> things
> >>> to
> >>>>> happen at the same time instead of sequentially. As Zhijiang has
> pointed
> >>>>> out there are some things not considered in your proposal: overtaking
> >>>>> output buffers, but maybe those things could be incorporated
> together.
> >>>>>>>
> >>>>>>> Another thing is that from the wiki description I understood that
> the
> >>>>> initial checkpointing is not initialised by any checkpoint barrier,
> but
> >>> by
> >>>>> an independent call/message from the Observer. I haven’t played with
> >>> this
> >>>>> idea a lot, but I had some discussion with Nico and it seems that it
> >>> might
> >>>>> work:
> >>>>>>>
> >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all tasks
> >>>>>>> 2. Task (with two input channels l1 and l2) upon receiving RPC from
> >>> 1.,
> >>>>> takes a snapshot of it's state and:
> >>>>>>> a) broadcast checkpoint barrier down the stream to all channels
> (let’s
> >>>>> ignore for a moment potential for this barrier to overtake the buffer
> >>>>> output data)
> >>>>>>> b) for any input channel for which it hasn’t yet received
> checkpoint
> >>>>> barrier, the data are being added to the checkpoint
> >>>>>>> c) once a channel (for example l1) receives a checkpoint barrier,
> the
> >>>>> Task blocks reads from that channel (?)
> >>>>>>> d) after all remaining channels (l2) receive checkpoint barriers,
> the
> >>>>> Task  first has to process the buffered data after that it can
> unblock
> >>> the
> >>>>> reads from the channels
> >>>>>>>
> >>>>>>> Checkpoint barriers do not cascade/flow through different tasks
> here.
> >>>>> Checkpoint barrier emitted from Task1, reaches only the immediate
> >>>>> downstream Tasks. Thanks to this setup, total checkpointing time is
> not
> >>> sum
> >>>>> of checkpointing times of all Tasks one by one, but more or less max
> of
> >>> the
> >>>>> slowest Tasks. Right?
> >>>>>>>
> >>>>>>> Couple of intriguing thoughts are:
> >>>>>>> 3. checkpoint barriers overtaking the output buffers
> >>>>>>> 4. can we keep processing some data (in order to not waste CPU
> cycles)
> >>>>> after we have taking the snapshot of the Task. I think we could.
> >>>>>>>
> >>>>>>> Piotrek
> >>>>>>>
> >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> wrote:
> >>>>>>>>
> >>>>>>>> Great discussion! I'm excited that this is already under
> >>>>> consideration! Are
> >>>>>>>> there any JIRAs or other traces of discussion to follow?
> >>>>>>>>
> >>>>>>>> Paris, if I understand correctly, then the proposal is to not
> block
> >>> any
> >>>>>>>> input channel at all, but only log data from the backpressured
> >>> channel
> >>>>> (and
> >>>>>>>> make it part of the snapshot) until the barrier arrives? This is
> >>>>>>>> intriguing. But probably there is also a benefit of to not
> continue
> >>>>> reading
> >>>>>>>> I1 since that could speed up retrieval from I2. Also, if the user
> >>> code
> >>>>> is
> >>>>>>>> the cause of backpressure, this would avoid pumping more data into
> >>> the
> >>>>>>>> process function.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Thomas
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <
> wangzhijiang...@aliyun.com
> >>>>> .invalid>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Paris,
> >>>>>>>>>
> >>>>>>>>> Thanks for the detailed sharing. And I think it is very similar
> with
> >>>>> the
> >>>>>>>>> way of overtaking we proposed before.
> >>>>>>>>>
> >>>>>>>>> There are some tiny difference:
> >>>>>>>>> The way of overtaking might need to snapshot all the input/output
> >>>>> queues.
> >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input channels
> >>> after
> >>>>> the
> >>>>>>>>> first barrier arrives, which might reduce the state sizea bit.
> But
> >>>>> normally
> >>>>>>>>> there should be less buffers for the first input channel with
> >>> barrier.
> >>>>>>>>> The output barrier still follows with regular data stream in
> Chandy
> >>>>>>>>> Lamport, the same way as current flink. For overtaking way, we
> need
> >>>>> to pay
> >>>>>>>>> extra efforts to make barrier transport firstly before outque
> queue
> >>> on
> >>>>>>>>> upstream side, and change the way of barrier alignment based on
> >>>>> receiving
> >>>>>>>>> instead of current reading on downstream side.
> >>>>>>>>> In the backpressure caused by data skew, the first barrier in
> almost
> >>>>> empty
> >>>>>>>>> input channel should arrive much eariler than the last heavy load
> >>>>> input
> >>>>>>>>> channel, so the Chandy Lamport could benefit well. But for the
> case
> >>>>> of all
> >>>>>>>>> balanced heavy load input channels, I mean the first arrived
> barrier
> >>>>> might
> >>>>>>>>> still take much time, then the overtaking way could still fit
> well
> >>> to
> >>>>> speed
> >>>>>>>>> up checkpoint.
> >>>>>>>>> Anyway, your proposed suggestion is helpful on my side,
> especially
> >>>>>>>>> considering some implementation details .
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Zhijiang
> >>>>>>>>>
> ------------------------------------------------------------------
> >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com>
> >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03
> >>>>>>>>> To:dev <dev@flink.apache.org>
> >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com>
> >>>>>>>>> Subject:Re: Checkpointing under backpressure
> >>>>>>>>>
> >>>>>>>>> yes! It’s quite similar I think.  Though mind that the devil is
> in
> >>> the
> >>>>>>>>> details, i.e., the temporal order actions are taken.
> >>>>>>>>>
> >>>>>>>>> To clarify, let us say you have a task T with two input channels
> I1
> >>>>> and I2.
> >>>>>>>>> The Chandy Lamport execution flow is the following:
> >>>>>>>>>
> >>>>>>>>> 1) T receives barrier from  I1 and...
> >>>>>>>>> 2)  ...the following three actions happen atomically
> >>>>>>>>> I )  T snapshots its state T*
> >>>>>>>>> II)  T forwards marker to its outputs
> >>>>>>>>> III) T starts logging all events of I2 (only) into a buffer M*
> >>>>>>>>> - Also notice here that T does NOT block I1 as it does in aligned
> >>>>>>>>> snapshots -
> >>>>>>>>> 3) Eventually T receives barrier from I2 and stops recording
> events.
> >>>>> Its
> >>>>>>>>> asynchronously captured snapshot is now complete: {T*,M*}.
> >>>>>>>>> Upon recovery all messages of M* should be replayed in FIFO
> order.
> >>>>>>>>>
> >>>>>>>>> With this approach alignment does not create a deadlock situation
> >>>>> since
> >>>>>>>>> anyway 2.II happens asynchronously and messages can be logged as
> >>> well
> >>>>>>>>> asynchronously during the process of the snapshot. If there is
> >>>>>>>>> back-pressure in a pipeline the cause is most probably not this
> >>>>> algorithm.
> >>>>>>>>>
> >>>>>>>>> Back to your observation, the answer : yes and no.  In your
> network
> >>>>> model,
> >>>>>>>>> I can see the logic of “logging” and “committing” a final
> snapshot
> >>>>> being
> >>>>>>>>> provided by the channel implementation. However, do mind that the
> >>>>> first
> >>>>>>>>> barrier always needs to go “all the way” to initiate the Chandy
> >>>>> Lamport
> >>>>>>>>> algorithm logic.
> >>>>>>>>>
> >>>>>>>>> The above flow has been proven using temporal logic in my phd
> thesis
> >>>>> in
> >>>>>>>>> case you are interested about the proof.
> >>>>>>>>> I hope this helps a little clarifying things. Let me know if
> there
> >>> is
> >>>>> any
> >>>>>>>>> confusing point to disambiguate. I would be more than happy to
> help
> >>>>> if I
> >>>>>>>>> can.
> >>>>>>>>>
> >>>>>>>>> Paris
> >>>>>>>>>
> >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <pi...@ververica.com>
> >>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport snapshots
> don’t
> >>>>> you
> >>>>>>>>> still have to wait for the “checkpoint barrier” to arrive in
> order
> >>> to
> >>>>> know
> >>>>>>>>> when have you already received all possible messages from the
> >>> upstream
> >>>>>>>>> tasks/operators? So instead of processing the “in flight”
> messages
> >>>>> (as the
> >>>>>>>>> Flink is doing currently), you are sending them to an “observer”?
> >>>>>>>>>>
> >>>>>>>>>> In that case, that’s sounds similar to “checkpoint barriers
> >>>>> overtaking
> >>>>>>>>> in flight records” (aka unaligned checkpoints). Just for us, the
> >>>>> observer
> >>>>>>>>> is a snapshot state.
> >>>>>>>>>>
> >>>>>>>>>> Piotrek
> >>>>>>>>>>
> >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone <
> seniorcarb...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Interesting problem! Thanks for bringing it up Thomas.
> >>>>>>>>>>>
> >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport
> >>>>> snapshots
> >>>>>>>>> [1] would help out solve this problem more elegantly without
> >>>>> sacrificing
> >>>>>>>>> correctness.
> >>>>>>>>>>> - They do not need alignment, only (async) logging for
> in-flight
> >>>>>>>>> records between the time the first barrier is processed until the
> >>> last
> >>>>>>>>> barrier arrives in a task.
> >>>>>>>>>>> - They work fine for failure recovery as long as logged records
> >>> are
> >>>>>>>>> replayed on startup.
> >>>>>>>>>>>
> >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still necessary
> >>> for
> >>>>>>>>> transactional sink commits + any sort of reconfiguration (e.g.,
> >>>>> rescaling,
> >>>>>>>>> updating the logic of operators to evolve an application etc.).
> >>>>>>>>>>>
> >>>>>>>>>>> I don’t completely understand the “overtaking” approach but if
> you
> >>>>> have
> >>>>>>>>> a concrete definition I would be happy to check it out and help
> if I
> >>>>> can!
> >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by logging
> things
> >>> in
> >>>>>>>>> pending channels in a task snapshot before the barrier arrives.
> >>>>>>>>>>>
> >>>>>>>>>>> -Paris
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
> >>>>> <
> >>>>>>>>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm>
> >>>>>>>>>>>
> >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com
> >
> >>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>
> >>>>>>>>>>>> As Zhijiang has responded, we are now in the process of
> >>> discussing
> >>>>> how
> >>>>>>>>> to address this issue and one of the solution that we are
> discussing
> >>>>> is
> >>>>>>>>> exactly what you are proposing: checkpoint barriers overtaking
> the
> >>> in
> >>>>>>>>> flight data and make the in flight data part of the checkpoint.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If everything works well, we will be able to present result of
> >>> our
> >>>>>>>>> discussions on the dev mailing list soon.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Piotrek
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang <
> wangzhijiang...@aliyun.com
> >>>>> .INVALID>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for proposing this concern. The barrier alignment
> takes
> >>>>> long
> >>>>>>>>> time in backpressure case which could cause several problems:
> >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned.
> >>>>>>>>>>>>> 2. The recovery cost is high once failover, because much data
> >>>>> needs
> >>>>>>>>> to be replayed.
> >>>>>>>>>>>>> 3. The delay for commit-based sink is high in exactly-once.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For credit-based flow control from release-1.5, the amount of
> >>>>>>>>> in-flighting buffers before barrier alignment is reduced, so we
> >>> could
> >>>>> get a
> >>>>>>>>> bit
> >>>>>>>>>>>>> benefits from speeding checkpoint aspect.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the channels which
> >>>>> already
> >>>>>>>>> received the barrier in practice. But actually we ever did the
> >>>>> similar thing
> >>>>>>>>>>>>> to speed barrier alighment before. I am not quite sure that
> >>>>>>>>> release-1.8 covers this feature. There were some relevant
> >>> discussions
> >>>>> under
> >>>>>>>>> jira [1].
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> For release-1.10, the community is now discussing the
> feature of
> >>>>>>>>> unaligned checkpoint which is mainly for resolving above
> concerns.
> >>> The
> >>>>>>>>> basic idea
> >>>>>>>>>>>>> is to make barrier overtakes the output/input buffer queue to
> >>>>> speed
> >>>>>>>>> alignment, and snapshot the input/output buffers as part of
> >>> checkpoint
> >>>>>>>>> state. The
> >>>>>>>>>>>>> details have not confirmed yet and is still under discussion.
> >>>>> Wish we
> >>>>>>>>> could make some improvments for the release-1.10.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Zhijiang
> >>>>>>>>>>>>>
> >>> ------------------------------------------------------------------
> >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org>
> >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38
> >>>>>>>>>>>>> To:dev <dev@flink.apache.org>
> >>>>>>>>>>>>> Subject:Checkpointing under backpressure
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> One of the major operational difficulties we observe with
> Flink
> >>>>> are
> >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for both
> >>>>>>>>> confirmation
> >>>>>>>>>>>>> of my understanding of the current behavior as well as
> pointers
> >>>>> for
> >>>>>>>>> future
> >>>>>>>>>>>>> improvement work:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Prior to introduction of credit based flow control in the
> >>> network
> >>>>>>>>> stack [1]
> >>>>>>>>>>>>> [2], checkpoint barriers would back up with the data for all
> >>>>> logical
> >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the
> buffers
> >>> are
> >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are only held
> >>>>> back for
> >>>>>>>>>>>>> channels that have backpressure, while others can continue
> >>>>> processing
> >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot "overtake
> >>>>> data",
> >>>>>>>>>>>>> therefore checkpoint alignment remains affected for the
> channel
> >>>>> with
> >>>>>>>>>>>>> backpressure, with the potential for slow checkpointing and
> >>>>> timeouts.
> >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the maximum
> >>>>> in-transit
> >>>>>>>>>>>>> buffers per channel, resulting in an improvement compared to
> >>>>> previous
> >>>>>>>>>>>>> versions of Flink. Also, the backpressure based checkpoint
> >>>>> alignment
> >>>>>>>>> can
> >>>>>>>>>>>>> help the barrier advance faster on the receiver side (by
> >>>>> suspending
> >>>>>>>>>>>>> channels that have already delivered the barrier). Is that
> >>>>> accurate
> >>>>>>>>> as of
> >>>>>>>>>>>>> Flink 1.8?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What appears to be missing to completely unblock
> checkpointing
> >>> is
> >>>>> a
> >>>>>>>>>>>>> mechanism for checkpoints to overtake the data. That would
> help
> >>> in
> >>>>>>>>>>>>> situations where the processing itself is the bottleneck and
> >>>>>>>>> prioritization
> >>>>>>>>>>>>> in the network stack alone cannot address the barrier delay.
> Was
> >>>>>>>>> there any
> >>>>>>>>>>>>> related discussion? One possible solution would be to drain
> >>>>> incoming
> >>>>>>>>> data
> >>>>>>>>>>>>> till the barrier and make it part of the checkpoint instead
> of
> >>>>>>>>> processing
> >>>>>>>>>>>>> it. This is somewhat related to asynchronous processing, but
> I'm
> >>>>>>>>> thinking
> >>>>>>>>>>>>> more of a solution that is automated in the Flink runtime for
> >>> the
> >>>>>>>>>>>>> backpressure scenario only.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>> https://flink.apache.org/2019/06/05/flink-network-stack.html
> >>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>
> >>>
> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >
>
>

Reply via email to