Thanks for the update and write up Arvid.

Piotrek

czw., 30 lip 2020 o 11:05 Arvid Heise <ar...@ververica.com> napisał(a):

> Dear all,
>
> I just wanted to follow-up on this long discussion thread by announcing
> that we implemented unaligned checkpoints in Flink 1.11. If you experience
> long end-to-end checkpointing duration, you should try out unaligned
> checkpoints [1] if the following applies:
>
>    - Checkpointing is not bottlenecked by I/O (to state backend). Possible
>    reasons are: slow connections, rate limits, or huge operator or user
> state.
>    - You can attribute the long duration to slow data flow. An operator in
>    the pipeline is severely lagging behind and you can easily see it in
> Flink
>    Web UI.
>    - You cannot alleviate the problem by adjusting the degree of
>    parallelism to the slow operator, either because of temporal spikes or
> lags
>    or because you don’t control the application in a platform-as-a-service
>    architecture.
>
> You can enable it in the flink-conf.yaml.
> execution.checkpointing.unaligned: true
>
> Or in your application:
> env.getCheckpointConfig().enableUnalignedCheckpoints() (Java/Scala)
> env.get_checkpoint_config().enable_unaligned_checkpoints() (Python)
>
> Note that this relatively young feature still has a couple of limitations
> that we resolve in future versions.
>
>    - You cannot rescale or change the job graph when starting from an
>    unaligned checkpoint; you have to take a savepoint before rescaling.
>    Savepoints are always aligned, independent of the alignment setting of
>    checkpoints. This feature has the highest priority and will be
> available in
>    upcoming releases.
>    - Flink currently does not support concurrent unaligned checkpoints.
>    However, due to the more predictable and shorter checkpointing times,
>    concurrent checkpoints might not be needed at all. However, savepoints
> can
>    also not happen concurrently to unaligned checkpoints, so they will take
>    slightly longer.
>    - SourceFunctions are user-defined, run a separate thread, and output
>    records under lock. When they block because of backpressure, the induced
>    checkpoints cannot acquire the lock and checkpointing duration
> increases.
>    We will provide SourceFunctions a way to also avoid blocking and
> implement
>    it for all sources in Flink core, but because the code is ultimately
>    user-defined, we have no way to guarantee non-blocking behavior.
>    Nevertheless, since only sources are affected, the checkpointing
> durations
>    are still much lower and most importantly do not increase with further
>    shuffles. Furthermore, Flink 1.11 also provides a new way to implement
>    sources (FLIP-27). This new source interface has a better threading
> model,
>    such that users do not create their own threads anymore and Flink can
>    guarantee non-blocking behavior for these sources.
>    - Unaligned checkpoints break with an implicit guarantee in respect to
>    watermarks during recovery. Currently, Flink generates the watermark as
> a
>    first step of recovery instead of storing the latest watermark in the
>    operators to ease rescaling. For unaligned checkpoints, this means
> that, on
>    recovery, Flink generates watermarks after it restores in-flight data.
> If
>    your pipeline uses an operator that applies the latest watermark on each
>    record, it will produce different results than for aligned checkpoints.
> If
>    your operator depends on the latest watermark being always available,
> then
>    the proper solution is to store the watermark in the operator state. To
>    support rescaling, watermarks should be stored per key-group in a
>    union-state. This feature has a high priority.
>    - Lastly, there is a conceptual weakness in unaligned checkpoints: when
>    an operator produces an arbitrary amount of outputs for a single input,
>    such as flatMap, all of these output records need to be stored into the
>    state for the unaligned checkpoint, which may increase the state size by
>    orders of magnitudes and slow down checkpointing and recovery. However,
>    since flatMap only needs alignment after a shuffle and rarely produces a
>    huge number of records for a single input, it’s more of a theoretic
>    problem.
>
> Lastly, we also plan to improve the configurations, such that ultimately,
> unaligned checkpoints will be the default configuration.
>
>    - Users will be able to configure a timeout, such that each operator
>    first tries to perform an aligned checkpoint. If the timeout is
> triggered,
>    it switches to an unaligned checkpoint. Since the timeout would only
>    trigger in the niche use cases that unaligned checkpoints addresses, it
>    would mostly perform an aligned checkpoint under no or low backpressure.
>    Thus, together with the previously mentioned fixes for the limitation,
> this
>    timeout would allow Flink to enable unaligned checkpoints by default.
>    - Another idea is to provide users to define a maximum state size for
>    the in-flight data. However, it might be hard for users to configure the
>    size correctly as it also requires to know how many buffers are used in
> the
>    respective application and it might be even harder to actually use the
> size
>    limit in a meaningful way.
>    - Lastly, to address the flatMap issue, there will be an option to
>    trigger the unaligned checkpoints on the last barrier of all input
> channels
>    instead of the first. Thus, there is still an alignment phase but it
> should
>    be rather quick as checkpoint barriers are still inserted at the head of
>    the output buffer queue. Conceptually, checkpoint barriers would still
> not
>    depend on the data flow.
>
> We are currently preparing a blog post on this topic, from which I copied
> some passages. We are happy to hear your feedback.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/stateful-stream-processing.html#unaligned-checkpointing
>
> On Wed, Dec 4, 2019 at 9:07 PM Thomas Weise <t...@apache.org> wrote:
>
> > Hi Arvid,
> >
> > Thanks for putting together the proposal [1]
> >
> > I'm planning to take a closer look in the next few days.
> >
> > Has any of the work been translated to JIRAs yet and what would be the
> > approximate target release?
> >
> > Thanks,
> > Thomas
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> >
> > On Wed, Oct 2, 2019 at 12:11 PM Arvid Heise <ar...@ververica.com> wrote:
> >
> >> Sry incorrect link, please follow [1].
> >>
> >> [1]
> >>
> >>
> https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/%3CCAGZNd0FgVL0oDQJHpBwJ1Ha8QevsVG0FHixdet11tLhW2p-2hg%40mail.gmail.com%3E
> >>
> >> On Wed, Oct 2, 2019 at 3:44 PM Arvid Heise <ar...@ververica.com> wrote:
> >>
> >> > FYI, we published FLIP-76 to address the issue and discussion has been
> >> > opened in [1].
> >> >
> >> > Looking forward to your feedback,
> >> >
> >> > Arvid
> >> >
> >> > [1]
> >> >
> https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/browser
> >> >
> >> > On Thu, Aug 15, 2019 at 9:43 AM Yun Gao <yungao...@aliyun.com.invalid
> >
> >> > wrote:
> >> >
> >> >> Hi,
> >> >>     Very thanks for the great points!
> >> >>
> >> >>     For the prioritizing inputs, from another point of view, I think
> it
> >> >> might not cause other bad effects, since we do not need to totally
> >> block
> >> >> the channels that have seen barriers after the operator has taking
> >> >> snapshot. After the snapshotting, if the channels that has not seen
> >> >> barriers have buffers, we could first logging and processing these
> >> buffers
> >> >> and if they do not have buffers, we can still processing the buffers
> >> from
> >> >> the channels that has seen barriers. Therefore, It seems prioritizing
> >> >> inputs should be able to accelerate the checkpoint without other bad
> >> >> effects.
> >> >>
> >> >>    and @zhijiangFor making the unaligned checkpoint the only
> mechanism
> >> >> for all cases, I still think we should allow a configurable timeout
> >> after
> >> >> receiving the first barrier so that the channels may get "drained"
> >> during
> >> >> the timeout, as pointed out by Stephan. With such a timeout, we are
> >> very
> >> >> likely not need to snapshot the input buffers, which would be very
> >> similar
> >> >> to the current aligned checkpoint mechanism.
> >> >>
> >> >> Best,
> >> >> Yun
> >> >>
> >> >>
> >> >> ------------------------------------------------------------------
> >> >> From:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> >> >> Send Time:2019 Aug. 15 (Thu.) 02:22
> >> >> To:dev <dev@flink.apache.org>
> >> >> Subject:Re: Checkpointing under backpressure
> >> >>
> >> >> > For the checkpoint to complete, any buffer that
> >> >> > arrived prior to the barrier would be to be part of the
> checkpointed
> >> >> state.
> >> >>
> >> >> Yes, I agree.
> >> >>
> >> >> > So wouldn't it be important to finish persisting these buffers as
> >> fast
> >> >> as
> >> >> > possible by prioritizing respective inputs? The task won't be able
> to
> >> >> > process records from the inputs that have seen the barrier fast
> when
> >> it
> >> >> is
> >> >> > already backpressured (or causing the backpressure).
> >> >>
> >> >> My previous understanding of prioritizing inputs is from task
> >> processing
> >> >> aspect after snapshot state. If from the persisting buffers aspect, I
> >> think
> >> >> it might be up to how we implement it.
> >> >> If we only tag/reference which buffers in inputs be the part of
> state,
> >> >> and make the real persisting work is done in async way. That means
> the
> >> >> already tagged buffers could be processed by task w/o priority.
> >> >> And only after all the persisting work done, the task would report to
> >> >> coordinator of finished checkpoint on its side. The key point is how
> we
> >> >> implement to make task could continue processing buffers as soon as
> >> >> possible.
> >> >>
> >> >> Thanks for the further explannation of requirements for speeding up
> >> >> checkpoints in backpressure scenario. To make the savepoint finish
> >> quickly
> >> >> and then tune the setting to avoid backpressure is really a pratical
> >> case.
> >> >> I think this solution could cover this concern.
> >> >>
> >> >> Best,
> >> >> Zhijiang
> >> >> ------------------------------------------------------------------
> >> >> From:Thomas Weise <t...@apache.org>
> >> >> Send Time:2019年8月14日(星期三) 19:48
> >> >> To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> >> >> Subject:Re: Checkpointing under backpressure
> >> >>
> >> >> -->
> >> >>
> >> >> On Wed, Aug 14, 2019 at 10:23 AM zhijiang
> >> >> <wangzhijiang...@aliyun.com.invalid> wrote:
> >> >>
> >> >> > Thanks for these great points and disccusions!
> >> >> >
> >> >> > 1. Considering the way of triggering checkpoint RPC calls to all
> the
> >> >> tasks
> >> >> > from Chandy Lamport, it combines two different mechanisms together
> to
> >> >> make
> >> >> > sure that the trigger could be fast in different scenarios.
> >> >> > But in flink world it might be not very worth trying that way, just
> >> as
> >> >> > Stephan's analysis for it. Another concern is that it might bring
> >> more
> >> >> > heavy loads for JobMaster broadcasting this checkpoint RPC to all
> the
> >> >> tasks
> >> >> > in large scale job, especially for the very short checkpoint
> >> interval.
> >> >> > Furthermore it would also cause other important RPC to be executed
> >> >> delay to
> >> >> > bring potentail timeout risks.
> >> >> >
> >> >> > 2. I agree with the idea of drawing on the way "take state snapshot
> >> on
> >> >> > first barrier" from Chandy Lamport instead of barrier alignment
> >> >> combining
> >> >> > with unaligned checkpoints in flink.
> >> >> >
> >> >> > > >>>> The benefit would be less latency increase in the channels
> >> which
> >> >> > already have received barriers.
> >> >> > > >>>> However, as mentioned before, not prioritizing the inputs
> from
> >> >> > which barriers are still missing can also have an adverse effect.
> >> >> >
> >> >> > I think we will not have an adverse effect if not prioritizing the
> >> >> inputs
> >> >> > w/o barriers in this case. After sync snapshot, the task could
> >> actually
> >> >> > process any input channels. For the input channel receiving the
> first
> >> >> > barrier, we already have the obvious boundary for persisting
> buffers.
> >> >> For
> >> >> > other channels w/o barriers we could persist the following buffers
> >> for
> >> >> > these channels until barrier arrives in network. Because based on
> the
> >> >> > credit based flow control, the barrier does not need credit to
> >> >> transport,
> >> >> > then as long as the sender overtakes the barrier accross the output
> >> >> queue,
> >> >> > the network stack would transport this barrier immediately no
> matter
> >> >> with
> >> >> > the inputs condition on receiver side. So there is no requirements
> to
> >> >> > consume accumulated buffers in these channels for higher priority.
> If
> >> >> so it
> >> >> > seems that we will not waste any CPU cycles as Piotr concerns
> before.
> >> >> >
> >> >>
> >> >> I'm not sure I follow this. For the checkpoint to complete, any
> buffer
> >> >> that
> >> >> arrived prior to the barrier would be to be part of the checkpointed
> >> >> state.
> >> >> So wouldn't it be important to finish persisting these buffers as
> fast
> >> as
> >> >> possible by prioritizing respective inputs? The task won't be able to
> >> >> process records from the inputs that have seen the barrier fast when
> >> it is
> >> >> already backpressured (or causing the backpressure).
> >> >>
> >> >>
> >> >> >
> >> >> > 3. Suppose the unaligned checkpoints performing well in practice,
> is
> >> it
> >> >> > possible to make it as the only mechanism for handling all the
> >> cases? I
> >> >> > mean for the non-backpressure scenario, there are less buffers even
> >> >> empty
> >> >> > in input/output queue, then the "overtaking barrier--> trigger
> >> snapshot
> >> >> on
> >> >> > first barrier--> persist buffers" might still work well. So we do
> not
> >> >> need
> >> >> > to maintain two suits of mechanisms finally.
> >> >> >
> >> >> > 4.  The initial motivation of this dicussion is for checkpoint
> >> timeout
> >> >> in
> >> >> > backpressure scenario. If we adjust the default timeout to a very
> big
> >> >> > value, that means the checkpoint would never timeout and we only
> >> need to
> >> >> > wait it finish. Then are there still any other problems/concerns if
> >> >> > checkpoint takes long time to finish? Althougn we already knew some
> >> >> issues
> >> >> > before, it is better to gather more user feedbacks to confirm which
> >> >> aspects
> >> >> > could be solved in this feature design. E.g. the sink commit delay
> >> might
> >> >> > not be coverd by unaligned solution.
> >> >> >
> >> >>
> >> >> Checkpoints taking too long is the concern that sparks this
> discussion
> >> >> (timeout is just a symptom). The slowness issue also applies to the
> >> >> savepoint use case. We would need to be able to take a savepoint fast
> >> in
> >> >> order to roll forward a fix that can alleviate the backpressure (like
> >> >> changing parallelism or making a different configuration change).
> >> >>
> >> >>
> >> >> >
> >> >> > Best,
> >> >> > Zhijiang
> >> >> > ------------------------------------------------------------------
> >> >> > From:Stephan Ewen <se...@apache.org>
> >> >> > Send Time:2019年8月14日(星期三) 17:43
> >> >> > To:dev <dev@flink.apache.org>
> >> >> > Subject:Re: Checkpointing under backpressure
> >> >> >
> >> >> > Quick note: The current implementation is
> >> >> >
> >> >> > Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part)
> >> >> >
> >> >> > On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski <
> pi...@ververica.com>
> >> >> > wrote:
> >> >> >
> >> >> > > > Thanks for the great ideas so far.
> >> >> > >
> >> >> > > +1
> >> >> > >
> >> >> > > Regarding other things raised, I mostly agree with Stephan.
> >> >> > >
> >> >> > > I like the idea of simultaneously starting the checkpoint
> >> everywhere
> >> >> via
> >> >> > > RPC call (especially in cases where Tasks are busy doing some
> >> >> synchronous
> >> >> > > operations for example for tens of milliseconds. In that case
> every
> >> >> > network
> >> >> > > exchange adds tens of milliseconds of delay in propagating the
> >> >> > checkpoint).
> >> >> > > However I agree that this might be a premature optimisation
> >> assuming
> >> >> the
> >> >> > > current state of our code (we already have checkpoint barriers).
> >> >> > >
> >> >> > > However I like the idea of switching from:
> >> >> > >
> >> >> > > 1. A -> S -> F (Align -> snapshot -> forward markers)
> >> >> > >
> >> >> > > To
> >> >> > >
> >> >> > > 2. S -> F -> L (Snapshot -> forward markers -> log pending
> >> channels)
> >> >> > >
> >> >> > > Or even to
> >> >> > >
> >> >> > > 6. F -> S -> L (Forward markers -> snapshot -> log pending
> >> channels)
> >> >> > >
> >> >> > > It feels to me like this would decouple propagation of
> checkpoints
> >> >> from
> >> >> > > costs of synchronous snapshots and waiting for all of the
> >> checkpoint
> >> >> > > barriers to arrive (even if they will overtake in-flight records,
> >> this
> >> >> > > might take some time).
> >> >> > >
> >> >> > > > What I like about the Chandy Lamport approach (2.) initiated
> from
> >> >> > > sources is that:
> >> >> > > >       - Snapshotting imposes no modification to normal
> >> processing.
> >> >> > >
> >> >> > > Yes, I agree that would be nice. Currently, during the alignment
> >> and
> >> >> > > blocking of the input channels, we might be wasting CPU cycles of
> >> up
> >> >> > stream
> >> >> > > tasks. If we succeed in designing new checkpointing mechanism to
> >> not
> >> >> > > disrupt/block regular data processing (% the extra IO cost for
> >> logging
> >> >> > the
> >> >> > > in-flight records), that would be a huge improvement.
> >> >> > >
> >> >> > > Piotrek
> >> >> > >
> >> >> > > > On 14 Aug 2019, at 14:56, Paris Carbone <
> seniorcarb...@gmail.com
> >> >
> >> >> > wrote:
> >> >> > > >
> >> >> > > > Sure I see. In cases when no periodic aligned snapshots are
> >> employed
> >> >> > > this is the only option.
> >> >> > > >
> >> >> > > > Two things that were not highlighted enough so far on the
> >> proposed
> >> >> > > protocol (included my mails):
> >> >> > > >       - The Recovery/Reconfiguration strategy should strictly
> >> >> > prioritise
> >> >> > > processing logged events before entering normal task input
> >> operation.
> >> >> > > Otherwise causality can be violated. This also means dataflow
> >> recovery
> >> >> > will
> >> >> > > be expected to be slower to the one employed on an aligned
> >> snapshot.
> >> >> > > >       - Same as with state capture, markers should be forwarded
> >> upon
> >> >> > > first marker received on input. No later than that. Otherwise we
> >> have
> >> >> > > duplicate side effects.
> >> >> > > >
> >> >> > > > Thanks for the great ideas so far.
> >> >> > > >
> >> >> > > > Paris
> >> >> > > >
> >> >> > > >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org>
> >> wrote:
> >> >> > > >>
> >> >> > > >> Scaling with unaligned checkpoints might be a necessity.
> >> >> > > >>
> >> >> > > >> Let's assume the job failed due to a lost TaskManager, but no
> >> new
> >> >> > > >> TaskManager becomes available.
> >> >> > > >> In that case we need to scale down based on the latest
> complete
> >> >> > > checkpoint,
> >> >> > > >> because we cannot produce a new checkpoint.
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone <
> >> >> > seniorcarb...@gmail.com>
> >> >> > > >> wrote:
> >> >> > > >>
> >> >> > > >>> +1 I think we are on the same page Stephan.
> >> >> > > >>>
> >> >> > > >>> Rescaling on unaligned checkpoint sounds challenging and a
> bit
> >> >> > > >>> unnecessary. No?
> >> >> > > >>> Why not sticking to aligned snapshots for live
> >> >> > > reconfiguration/rescaling?
> >> >> > > >>> It’s a pretty rare operation and it would simplify things by
> a
> >> >> lot.
> >> >> > > >>> Everything can be “staged” upon alignment including replacing
> >> >> > channels
> >> >> > > and
> >> >> > > >>> tasks.
> >> >> > > >>>
> >> >> > > >>> -Paris
> >> >> > > >>>
> >> >> > > >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org>
> >> wrote:
> >> >> > > >>>>
> >> >> > > >>>> Hi all!
> >> >> > > >>>>
> >> >> > > >>>> Yes, the first proposal of "unaligend checkpoints" (probably
> >> two
> >> >> > years
> >> >> > > >>> back
> >> >> > > >>>> now) drew a major inspiration from Chandy Lamport, as did
> >> >> actually
> >> >> > the
> >> >> > > >>>> original checkpointing algorithm.
> >> >> > > >>>>
> >> >> > > >>>> "Logging data between first and last barrier" versus
> "barrier
> >> >> > jumping
> >> >> > > >>> over
> >> >> > > >>>> buffer and storing those buffers" is pretty close same.
> >> >> > > >>>> However, there are a few nice benefits of the proposal of
> >> >> unaligned
> >> >> > > >>>> checkpoints over Chandy-Lamport.
> >> >> > > >>>>
> >> >> > > >>>> *## Benefits of Unaligned Checkpoints*
> >> >> > > >>>>
> >> >> > > >>>> (1) It is very similar to the original algorithm (can be
> seen
> >> an
> >> >> an
> >> >> > > >>>> optional feature purely in the network stack) and thus can
> >> share
> >> >> > > lot's of
> >> >> > > >>>> code paths.
> >> >> > > >>>>
> >> >> > > >>>> (2) Less data stored. If we make the "jump over buffers"
> part
> >> >> > timeout
> >> >> > > >>> based
> >> >> > > >>>> (for example barrier overtakes buffers if not flushed within
> >> >> 10ms)
> >> >> > > then
> >> >> > > >>>> checkpoints are in the common case of flowing pipelines
> >> aligned
> >> >> > > without
> >> >> > > >>>> in-flight data. Only back pressured cases store some
> in-flight
> >> >> data,
> >> >> > > >>> which
> >> >> > > >>>> means we don't regress in the common case and only fix the
> >> back
> >> >> > > pressure
> >> >> > > >>>> case.
> >> >> > > >>>>
> >> >> > > >>>> (3) Faster checkpoints. Chandy Lamport still waits for all
> >> >> barriers
> >> >> > to
> >> >> > > >>>> arrive naturally, logging on the way. If data processing is
> >> slow,
> >> >> > this
> >> >> > > >>> can
> >> >> > > >>>> still take quite a while.
> >> >> > > >>>>
> >> >> > > >>>> ==> I think both these points are strong reasons to not
> change
> >> >> the
> >> >> > > >>>> mechanism away from "trigger sources" and start with
> CL-style
> >> >> > "trigger
> >> >> > > >>> all".
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>> *## Possible ways to combine Chandy Lamport and Unaligned
> >> >> > Checkpoints*
> >> >> > > >>>>
> >> >> > > >>>> We can think about something like "take state snapshot on
> >> first
> >> >> > > barrier"
> >> >> > > >>>> and then store buffers until the other barriers arrive.
> Inside
> >> >> the
> >> >> > > >>> network
> >> >> > > >>>> stack, barriers could still overtake and persist buffers.
> >> >> > > >>>> The benefit would be less latency increase in the channels
> >> which
> >> >> > > already
> >> >> > > >>>> have received barriers.
> >> >> > > >>>> However, as mentioned before, not prioritizing the inputs
> from
> >> >> which
> >> >> > > >>>> barriers are still missing can also have an adverse effect.
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>> *## Concerning upgrades*
> >> >> > > >>>>
> >> >> > > >>>> I think it is a fair restriction to say that upgrades need
> to
> >> >> happen
> >> >> > > on
> >> >> > > >>>> aligned checkpoints. It is a rare enough operation.
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>> *## Concerning re-scaling (changing parallelism)*
> >> >> > > >>>>
> >> >> > > >>>> We need to support that on unaligned checkpoints as well.
> >> There
> >> >> are
> >> >> > > >>> several
> >> >> > > >>>> feature proposals about automatic scaling, especially down
> >> >> scaling
> >> >> > in
> >> >> > > >>> case
> >> >> > > >>>> of missing resources. The last snapshot might be a regular
> >> >> > > checkpoint, so
> >> >> > > >>>> all checkpoints need to support rescaling.
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>> *## Concerning end-to-end checkpoint duration and "trigger
> >> >> sources"
> >> >> > > >>> versus
> >> >> > > >>>> "trigger all"*
> >> >> > > >>>>
> >> >> > > >>>> I think for the end-to-end checkpoint duration, an "overtake
> >> >> > buffers"
> >> >> > > >>>> approach yields faster checkpoints, as mentioned above
> (Chandy
> >> >> > Lamport
> >> >> > > >>>> logging still needs to wait for barrier to flow).
> >> >> > > >>>>
> >> >> > > >>>> I don't see the benefit of a "trigger all tasks via RPC
> >> >> > concurrently"
> >> >> > > >>>> approach. Bear in mind that it is still a globally
> coordinated
> >> >> > > approach
> >> >> > > >>> and
> >> >> > > >>>> you need to wait for the global checkpoint to complete
> before
> >> >> > > committing
> >> >> > > >>>> any side effects.
> >> >> > > >>>> I believe that the checkpoint time is more determined by the
> >> >> state
> >> >> > > >>>> checkpoint writing, and the global coordination and metadata
> >> >> commit,
> >> >> > > than
> >> >> > > >>>> by the difference in alignment time between "trigger from
> >> source
> >> >> and
> >> >> > > jump
> >> >> > > >>>> over buffers" versus "trigger all tasks concurrently".
> >> >> > > >>>>
> >> >> > > >>>> Trying to optimize a few tens of milliseconds out of the
> >> network
> >> >> > stack
> >> >> > > >>>> sends (and changing the overall checkpointing approach
> >> completely
> >> >> > for
> >> >> > > >>> that)
> >> >> > > >>>> while staying with a globally coordinated checkpoint will
> >> send us
> >> >> > > down a
> >> >> > > >>>> path to a dead end.
> >> >> > > >>>>
> >> >> > > >>>> To really bring task persistence latency down to 10s of
> >> >> milliseconds
> >> >> > > (so
> >> >> > > >>> we
> >> >> > > >>>> can frequently commit in sinks), we need to take an approach
> >> >> without
> >> >> > > any
> >> >> > > >>>> global coordination. Tasks need to establish a persistent
> >> >> recovery
> >> >> > > point
> >> >> > > >>>> individually and at their own discretion, only then can it
> be
> >> >> > frequent
> >> >> > > >>>> enough. To get there, they would need to decouple themselves
> >> from
> >> >> > the
> >> >> > > >>>> predecessor and successor tasks (via something like
> persistent
> >> >> > > channels).
> >> >> > > >>>> This is a different discussion, though, somewhat orthogonal
> to
> >> >> this
> >> >> > > one
> >> >> > > >>>> here.
> >> >> > > >>>>
> >> >> > > >>>> Best,
> >> >> > > >>>> Stephan
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski <
> >> >> > pi...@ververica.com>
> >> >> > > >>> wrote:
> >> >> > > >>>>
> >> >> > > >>>>> Hi again,
> >> >> > > >>>>>
> >> >> > > >>>>> Zhu Zhu let me think about this more. Maybe as Paris is
> >> >> writing, we
> >> >> > > do
> >> >> > > >>> not
> >> >> > > >>>>> need to block any channels at all, at least assuming credit
> >> base
> >> >> > flow
> >> >> > > >>>>> control. Regarding what should happen with the following
> >> >> checkpoint
> >> >> > > is
> >> >> > > >>>>> another question. Also, should we support concurrent
> >> checkpoints
> >> >> > and
> >> >> > > >>>>> subsuming checkpoints as we do now? Maybe not…
> >> >> > > >>>>>
> >> >> > > >>>>> Paris
> >> >> > > >>>>>
> >> >> > > >>>>> Re
> >> >> > > >>>>> I. 2. a) and b) - yes, this would have to be taken into an
> >> >> account
> >> >> > > >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint
> >> time
> >> >> > will
> >> >> > > >>>>> probably be longer than it could be. It might affect
> external
> >> >> > > systems.
> >> >> > > >>> For
> >> >> > > >>>>> example Kafka, which automatically time outs lingering
> >> >> > transactions,
> >> >> > > and
> >> >> > > >>>>> for us, the transaction time is equal to the time between
> two
> >> >> > > >>> checkpoints.
> >> >> > > >>>>>
> >> >> > > >>>>> II 1. - I’m confused. To make things straight. Flink is
> >> >> currently
> >> >> > > >>>>> snapshotting once it receives all of the checkpoint
> barriers
> >> >> from
> >> >> > > all of
> >> >> > > >>>>> the input channels and only then it broadcasts the
> checkpoint
> >> >> > barrier
> >> >> > > >>> down
> >> >> > > >>>>> the stream. And this is correct from exactly-once
> >> perspective.
> >> >> > > >>>>>
> >> >> > > >>>>> As far as I understand, your proposal based on Chandy
> Lamport
> >> >> > > algorithm,
> >> >> > > >>>>> is snapshotting the state of the operator on the first
> >> >> checkpoint
> >> >> > > >>> barrier,
> >> >> > > >>>>> which also looks correct to me.
> >> >> > > >>>>>
> >> >> > > >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more
> >> about
> >> >> > > this.
> >> >> > > >>>>>
> >> >> > > >>>>> V. Yes, we still need aligned checkpoints, as they are
> easier
> >> >> for
> >> >> > > state
> >> >> > > >>>>> migration and upgrades.
> >> >> > > >>>>>
> >> >> > > >>>>> Piotrek
> >> >> > > >>>>>
> >> >> > > >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone <
> >> >> seniorcarb...@gmail.com>
> >> >> > > >>> wrote:
> >> >> > > >>>>>>
> >> >> > > >>>>>> Now I see a little more clearly what you have in mind.
> >> Thanks
> >> >> for
> >> >> > > the
> >> >> > > >>>>> explanation!
> >> >> > > >>>>>> There are a few intermixed concepts here, some how to do
> >> with
> >> >> > > >>>>> correctness some with performance.
> >> >> > > >>>>>> Before delving deeper I will just enumerate a few things
> to
> >> >> make
> >> >> > > myself
> >> >> > > >>>>> a little more helpful if I can.
> >> >> > > >>>>>>
> >> >> > > >>>>>> I. Initiation
> >> >> > > >>>>>> -------------
> >> >> > > >>>>>>
> >> >> > > >>>>>> 1. RPC to sources only is a less intrusive way to initiate
> >> >> > snapshots
> >> >> > > >>>>> since you utilize better pipeline parallelism (only a small
> >> >> subset
> >> >> > of
> >> >> > > >>> tasks
> >> >> > > >>>>> is running progressively the protocol at a time, if
> >> >> snapshotting is
> >> >> > > >>> async
> >> >> > > >>>>> the overall overhead might not even be observable).
> >> >> > > >>>>>>
> >> >> > > >>>>>> 2. If we really want an RPC to all initiation take notice
> of
> >> >> the
> >> >> > > >>>>> following implications:
> >> >> > > >>>>>>
> >> >> > > >>>>>>    a. (correctness) RPC calls are not guaranteed to arrive
> >> in
> >> >> > every
> >> >> > > >>>>> task before a marker from a preceding task.
> >> >> > > >>>>>>
> >> >> > > >>>>>>    b. (correctness) Either the RPC call OR the first
> >> arriving
> >> >> > marker
> >> >> > > >>>>> should initiate the algorithm. Whichever comes first. If
> you
> >> >> only
> >> >> > do
> >> >> > > it
> >> >> > > >>> per
> >> >> > > >>>>> RPC call then you capture a "late" state that includes side
> >> >> effects
> >> >> > > of
> >> >> > > >>>>> already logged events.
> >> >> > > >>>>>>
> >> >> > > >>>>>>    c. (performance) Lots of IO will be invoked at the same
> >> >> time on
> >> >> > > >>>>> the backend store from all tasks. This might lead to high
> >> >> > congestion
> >> >> > > in
> >> >> > > >>>>> async snapshots.
> >> >> > > >>>>>>
> >> >> > > >>>>>> II. Capturing State First
> >> >> > > >>>>>> -------------------------
> >> >> > > >>>>>>
> >> >> > > >>>>>> 1. (correctness) Capturing state at the last marker sounds
> >> >> > > incorrect to
> >> >> > > >>>>> me (state contains side effects of already logged events
> >> based
> >> >> on
> >> >> > the
> >> >> > > >>>>> proposed scheme). This results into duplicate processing.
> No?
> >> >> > > >>>>>>
> >> >> > > >>>>>> III. Channel Blocking / "Alignment"
> >> >> > > >>>>>> -----------------------------------
> >> >> > > >>>>>>
> >> >> > > >>>>>> 1. (performance?) What is the added benefit? We dont want
> a
> >> >> > > "complete"
> >> >> > > >>>>> transactional snapshot, async snapshots are purely for
> >> >> > > failure-recovery.
> >> >> > > >>>>> Thus, I dont see why this needs to be imposed at the
> expense
> >> of
> >> >> > > >>>>> performance/throughput. With the proposed scheme the whole
> >> >> dataflow
> >> >> > > >>> anyway
> >> >> > > >>>>> enters snapshotting/logging mode so tasks more or less
> >> snapshot
> >> >> > > >>>>> concurrently.
> >> >> > > >>>>>>
> >> >> > > >>>>>> IV Marker Bypassing
> >> >> > > >>>>>> -------------------
> >> >> > > >>>>>>
> >> >> > > >>>>>> 1. (correctness) This leads to equivalent in-flight
> >> snapshots
> >> >> so
> >> >> > > with
> >> >> > > >>>>> some quick thinking  correct. I will try to model this
> later
> >> and
> >> >> > get
> >> >> > > >>> back
> >> >> > > >>>>> to you in case I find something wrong.
> >> >> > > >>>>>>
> >> >> > > >>>>>> 2. (performance) It also sounds like a meaningful
> >> >> optimisation! I
> >> >> > > like
> >> >> > > >>>>> thinking of this as a push-based snapshot. i.e., the
> >> producing
> >> >> task
> >> >> > > >>> somehow
> >> >> > > >>>>> triggers forward a consumer/channel to capture its state.
> By
> >> >> > example
> >> >> > > >>>>> consider T1 -> |marker t1| -> T2.
> >> >> > > >>>>>>
> >> >> > > >>>>>> V. Usage of "Async" Snapshots
> >> >> > > >>>>>> ---------------------
> >> >> > > >>>>>>
> >> >> > > >>>>>> 1. Do you see this as a full replacement of "full" aligned
> >> >> > > >>>>> snapshots/savepoints? In my view async shanpshots will be
> >> needed
> >> >> > from
> >> >> > > >>> time
> >> >> > > >>>>> to time but not as frequently. Yet, it seems like a valid
> >> >> approach
> >> >> > > >>> solely
> >> >> > > >>>>> for failure-recovery on the same configuration. Here's why:
> >> >> > > >>>>>>
> >> >> > > >>>>>>    a. With original snapshotting there is a strong duality
> >> >> between
> >> >> > > >>>>>>    a stream input (offsets) and committed side effects
> >> >> (internal
> >> >> > > >>>>> states and external commits to transactional sinks). While
> in
> >> >> the
> >> >> > > async
> >> >> > > >>>>> version, there are uncommitted operations (inflight
> records).
> >> >> Thus,
> >> >> > > you
> >> >> > > >>>>> cannot use these snapshots for e.g., submitting sql queries
> >> with
> >> >> > > >>> snapshot
> >> >> > > >>>>> isolation. Also, the original snapshotting gives a lot of
> >> >> potential
> >> >> > > for
> >> >> > > >>>>> flink to make proper transactional commits externally.
> >> >> > > >>>>>>
> >> >> > > >>>>>>    b. Reconfiguration is very tricky, you probably know
> that
> >> >> > better.
> >> >> > > >>>>> Inflight channel state is no longer valid in a new
> >> configuration
> >> >> > > (i.e.,
> >> >> > > >>> new
> >> >> > > >>>>> dataflow graph, new operators, updated operator logic,
> >> different
> >> >> > > >>> channels,
> >> >> > > >>>>> different parallelism)
> >> >> > > >>>>>>
> >> >> > > >>>>>> 2. Async snapshots can also be potentially useful for
> >> >> monitoring
> >> >> > the
> >> >> > > >>>>> general health of a dataflow since they can be analyzed by
> >> the
> >> >> task
> >> >> > > >>> manager
> >> >> > > >>>>> about the general performance of a job graph and spot
> >> >> bottlenecks
> >> >> > for
> >> >> > > >>>>> example.
> >> >> > > >>>>>>
> >> >> > > >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <
> >> pi...@ververica.com
> >> >> >
> >> >> > > wrote:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Hi,
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Thomas:
> >> >> > > >>>>>>> There are no Jira tickets yet (or maybe there is
> something
> >> >> very
> >> >> > old
> >> >> > > >>>>> somewhere). First we want to discuss it, next present FLIP
> >> and
> >> >> at
> >> >> > > last
> >> >> > > >>>>> create tickets :)
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>> if I understand correctly, then the proposal is to not
> >> block
> >> >> any
> >> >> > > >>>>>>>> input channel at all, but only log data from the
> >> >> backpressured
> >> >> > > >>> channel
> >> >> > > >>>>> (and
> >> >> > > >>>>>>>> make it part of the snapshot) until the barrier arrives
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> I would guess that it would be better to block the reads,
> >> >> unless
> >> >> > we
> >> >> > > >>> can
> >> >> > > >>>>> already process the records from the blocked channel…
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Paris:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Thanks for the explanation Paris. I’m starting to
> >> understand
> >> >> this
> >> >> > > more
> >> >> > > >>>>> and I like the idea of snapshotting the state of an
> operator
> >> >> before
> >> >> > > >>>>> receiving all of the checkpoint barriers - this would allow
> >> more
> >> >> > > things
> >> >> > > >>> to
> >> >> > > >>>>> happen at the same time instead of sequentially. As
> Zhijiang
> >> has
> >> >> > > pointed
> >> >> > > >>>>> out there are some things not considered in your proposal:
> >> >> > overtaking
> >> >> > > >>>>> output buffers, but maybe those things could be
> incorporated
> >> >> > > together.
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Another thing is that from the wiki description I
> >> understood
> >> >> that
> >> >> > > the
> >> >> > > >>>>> initial checkpointing is not initialised by any checkpoint
> >> >> barrier,
> >> >> > > but
> >> >> > > >>> by
> >> >> > > >>>>> an independent call/message from the Observer. I haven’t
> >> played
> >> >> > with
> >> >> > > >>> this
> >> >> > > >>>>> idea a lot, but I had some discussion with Nico and it
> seems
> >> >> that
> >> >> > it
> >> >> > > >>> might
> >> >> > > >>>>> work:
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all
> tasks
> >> >> > > >>>>>>> 2. Task (with two input channels l1 and l2) upon
> receiving
> >> RPC
> >> >> > from
> >> >> > > >>> 1.,
> >> >> > > >>>>> takes a snapshot of it's state and:
> >> >> > > >>>>>>> a) broadcast checkpoint barrier down the stream to all
> >> >> channels
> >> >> > > (let’s
> >> >> > > >>>>> ignore for a moment potential for this barrier to overtake
> >> the
> >> >> > buffer
> >> >> > > >>>>> output data)
> >> >> > > >>>>>>> b) for any input channel for which it hasn’t yet received
> >> >> > > checkpoint
> >> >> > > >>>>> barrier, the data are being added to the checkpoint
> >> >> > > >>>>>>> c) once a channel (for example l1) receives a checkpoint
> >> >> barrier,
> >> >> > > the
> >> >> > > >>>>> Task blocks reads from that channel (?)
> >> >> > > >>>>>>> d) after all remaining channels (l2) receive checkpoint
> >> >> barriers,
> >> >> > > the
> >> >> > > >>>>> Task  first has to process the buffered data after that it
> >> can
> >> >> > > unblock
> >> >> > > >>> the
> >> >> > > >>>>> reads from the channels
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Checkpoint barriers do not cascade/flow through different
> >> >> tasks
> >> >> > > here.
> >> >> > > >>>>> Checkpoint barrier emitted from Task1, reaches only the
> >> >> immediate
> >> >> > > >>>>> downstream Tasks. Thanks to this setup, total checkpointing
> >> >> time is
> >> >> > > not
> >> >> > > >>> sum
> >> >> > > >>>>> of checkpointing times of all Tasks one by one, but more or
> >> less
> >> >> > max
> >> >> > > of
> >> >> > > >>> the
> >> >> > > >>>>> slowest Tasks. Right?
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Couple of intriguing thoughts are:
> >> >> > > >>>>>>> 3. checkpoint barriers overtaking the output buffers
> >> >> > > >>>>>>> 4. can we keep processing some data (in order to not
> waste
> >> CPU
> >> >> > > cycles)
> >> >> > > >>>>> after we have taking the snapshot of the Task. I think we
> >> could.
> >> >> > > >>>>>>>
> >> >> > > >>>>>>> Piotrek
> >> >> > > >>>>>>>
> >> >> > > >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org>
> >> >> wrote:
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> Great discussion! I'm excited that this is already under
> >> >> > > >>>>> consideration! Are
> >> >> > > >>>>>>>> there any JIRAs or other traces of discussion to follow?
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> Paris, if I understand correctly, then the proposal is
> to
> >> not
> >> >> > > block
> >> >> > > >>> any
> >> >> > > >>>>>>>> input channel at all, but only log data from the
> >> >> backpressured
> >> >> > > >>> channel
> >> >> > > >>>>> (and
> >> >> > > >>>>>>>> make it part of the snapshot) until the barrier arrives?
> >> >> This is
> >> >> > > >>>>>>>> intriguing. But probably there is also a benefit of to
> not
> >> >> > > continue
> >> >> > > >>>>> reading
> >> >> > > >>>>>>>> I1 since that could speed up retrieval from I2. Also, if
> >> the
> >> >> > user
> >> >> > > >>> code
> >> >> > > >>>>> is
> >> >> > > >>>>>>>> the cause of backpressure, this would avoid pumping more
> >> data
> >> >> > into
> >> >> > > >>> the
> >> >> > > >>>>>>>> process function.
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> Thanks,
> >> >> > > >>>>>>>> Thomas
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <
> >> >> > > wangzhijiang...@aliyun.com
> >> >> > > >>>>> .invalid>
> >> >> > > >>>>>>>> wrote:
> >> >> > > >>>>>>>>
> >> >> > > >>>>>>>>> Hi Paris,
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Thanks for the detailed sharing. And I think it is very
> >> >> similar
> >> >> > > with
> >> >> > > >>>>> the
> >> >> > > >>>>>>>>> way of overtaking we proposed before.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> There are some tiny difference:
> >> >> > > >>>>>>>>> The way of overtaking might need to snapshot all the
> >> >> > input/output
> >> >> > > >>>>> queues.
> >> >> > > >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input
> >> >> channels
> >> >> > > >>> after
> >> >> > > >>>>> the
> >> >> > > >>>>>>>>> first barrier arrives, which might reduce the state
> sizea
> >> >> bit.
> >> >> > > But
> >> >> > > >>>>> normally
> >> >> > > >>>>>>>>> there should be less buffers for the first input
> channel
> >> >> with
> >> >> > > >>> barrier.
> >> >> > > >>>>>>>>> The output barrier still follows with regular data
> >> stream in
> >> >> > > Chandy
> >> >> > > >>>>>>>>> Lamport, the same way as current flink. For overtaking
> >> way,
> >> >> we
> >> >> > > need
> >> >> > > >>>>> to pay
> >> >> > > >>>>>>>>> extra efforts to make barrier transport firstly before
> >> >> outque
> >> >> > > queue
> >> >> > > >>> on
> >> >> > > >>>>>>>>> upstream side, and change the way of barrier alignment
> >> >> based on
> >> >> > > >>>>> receiving
> >> >> > > >>>>>>>>> instead of current reading on downstream side.
> >> >> > > >>>>>>>>> In the backpressure caused by data skew, the first
> >> barrier
> >> >> in
> >> >> > > almost
> >> >> > > >>>>> empty
> >> >> > > >>>>>>>>> input channel should arrive much eariler than the last
> >> heavy
> >> >> > load
> >> >> > > >>>>> input
> >> >> > > >>>>>>>>> channel, so the Chandy Lamport could benefit well. But
> >> for
> >> >> the
> >> >> > > case
> >> >> > > >>>>> of all
> >> >> > > >>>>>>>>> balanced heavy load input channels, I mean the first
> >> arrived
> >> >> > > barrier
> >> >> > > >>>>> might
> >> >> > > >>>>>>>>> still take much time, then the overtaking way could
> still
> >> >> fit
> >> >> > > well
> >> >> > > >>> to
> >> >> > > >>>>> speed
> >> >> > > >>>>>>>>> up checkpoint.
> >> >> > > >>>>>>>>> Anyway, your proposed suggestion is helpful on my side,
> >> >> > > especially
> >> >> > > >>>>>>>>> considering some implementation details .
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Best,
> >> >> > > >>>>>>>>> Zhijiang
> >> >> > > >>>>>>>>>
> >> >> > >
> ------------------------------------------------------------------
> >> >> > > >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com>
> >> >> > > >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03
> >> >> > > >>>>>>>>> To:dev <dev@flink.apache.org>
> >> >> > > >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com>
> >> >> > > >>>>>>>>> Subject:Re: Checkpointing under backpressure
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> yes! It’s quite similar I think.  Though mind that the
> >> >> devil is
> >> >> > > in
> >> >> > > >>> the
> >> >> > > >>>>>>>>> details, i.e., the temporal order actions are taken.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> To clarify, let us say you have a task T with two input
> >> >> > channels
> >> >> > > I1
> >> >> > > >>>>> and I2.
> >> >> > > >>>>>>>>> The Chandy Lamport execution flow is the following:
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> 1) T receives barrier from  I1 and...
> >> >> > > >>>>>>>>> 2)  ...the following three actions happen atomically
> >> >> > > >>>>>>>>> I )  T snapshots its state T*
> >> >> > > >>>>>>>>> II)  T forwards marker to its outputs
> >> >> > > >>>>>>>>> III) T starts logging all events of I2 (only) into a
> >> buffer
> >> >> M*
> >> >> > > >>>>>>>>> - Also notice here that T does NOT block I1 as it does
> in
> >> >> > aligned
> >> >> > > >>>>>>>>> snapshots -
> >> >> > > >>>>>>>>> 3) Eventually T receives barrier from I2 and stops
> >> recording
> >> >> > > events.
> >> >> > > >>>>> Its
> >> >> > > >>>>>>>>> asynchronously captured snapshot is now complete:
> >> {T*,M*}.
> >> >> > > >>>>>>>>> Upon recovery all messages of M* should be replayed in
> >> FIFO
> >> >> > > order.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> With this approach alignment does not create a deadlock
> >> >> > situation
> >> >> > > >>>>> since
> >> >> > > >>>>>>>>> anyway 2.II happens asynchronously and messages can be
> >> >> logged
> >> >> > as
> >> >> > > >>> well
> >> >> > > >>>>>>>>> asynchronously during the process of the snapshot. If
> >> there
> >> >> is
> >> >> > > >>>>>>>>> back-pressure in a pipeline the cause is most probably
> >> not
> >> >> this
> >> >> > > >>>>> algorithm.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Back to your observation, the answer : yes and no.  In
> >> your
> >> >> > > network
> >> >> > > >>>>> model,
> >> >> > > >>>>>>>>> I can see the logic of “logging” and “committing” a
> final
> >> >> > > snapshot
> >> >> > > >>>>> being
> >> >> > > >>>>>>>>> provided by the channel implementation. However, do
> mind
> >> >> that
> >> >> > the
> >> >> > > >>>>> first
> >> >> > > >>>>>>>>> barrier always needs to go “all the way” to initiate
> the
> >> >> Chandy
> >> >> > > >>>>> Lamport
> >> >> > > >>>>>>>>> algorithm logic.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> The above flow has been proven using temporal logic in
> my
> >> >> phd
> >> >> > > thesis
> >> >> > > >>>>> in
> >> >> > > >>>>>>>>> case you are interested about the proof.
> >> >> > > >>>>>>>>> I hope this helps a little clarifying things. Let me
> >> know if
> >> >> > > there
> >> >> > > >>> is
> >> >> > > >>>>> any
> >> >> > > >>>>>>>>> confusing point to disambiguate. I would be more than
> >> happy
> >> >> to
> >> >> > > help
> >> >> > > >>>>> if I
> >> >> > > >>>>>>>>> can.
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>> Paris
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <
> >> >> pi...@ververica.com
> >> >> > >
> >> >> > > >>>>> wrote:
> >> >> > > >>>>>>>>>>
> >> >> > > >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport
> >> >> snapshots
> >> >> > > don’t
> >> >> > > >>>>> you
> >> >> > > >>>>>>>>> still have to wait for the “checkpoint barrier” to
> >> arrive in
> >> >> > > order
> >> >> > > >>> to
> >> >> > > >>>>> know
> >> >> > > >>>>>>>>> when have you already received all possible messages
> from
> >> >> the
> >> >> > > >>> upstream
> >> >> > > >>>>>>>>> tasks/operators? So instead of processing the “in
> flight”
> >> >> > > messages
> >> >> > > >>>>> (as the
> >> >> > > >>>>>>>>> Flink is doing currently), you are sending them to an
> >> >> > “observer”?
> >> >> > > >>>>>>>>>>
> >> >> > > >>>>>>>>>> In that case, that’s sounds similar to “checkpoint
> >> barriers
> >> >> > > >>>>> overtaking
> >> >> > > >>>>>>>>> in flight records” (aka unaligned checkpoints). Just
> for
> >> us,
> >> >> > the
> >> >> > > >>>>> observer
> >> >> > > >>>>>>>>> is a snapshot state.
> >> >> > > >>>>>>>>>>
> >> >> > > >>>>>>>>>> Piotrek
> >> >> > > >>>>>>>>>>
> >> >> > > >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone <
> >> >> > > seniorcarb...@gmail.com>
> >> >> > > >>>>>>>>> wrote:
> >> >> > > >>>>>>>>>>>
> >> >> > > >>>>>>>>>>> Interesting problem! Thanks for bringing it up
> Thomas.
> >> >> > > >>>>>>>>>>>
> >> >> > > >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe
> >> >> Chandy-Lamport
> >> >> > > >>>>> snapshots
> >> >> > > >>>>>>>>> [1] would help out solve this problem more elegantly
> >> without
> >> >> > > >>>>> sacrificing
> >> >> > > >>>>>>>>> correctness.
> >> >> > > >>>>>>>>>>> - They do not need alignment, only (async) logging
> for
> >> >> > > in-flight
> >> >> > > >>>>>>>>> records between the time the first barrier is processed
> >> >> until
> >> >> > the
> >> >> > > >>> last
> >> >> > > >>>>>>>>> barrier arrives in a task.
> >> >> > > >>>>>>>>>>> - They work fine for failure recovery as long as
> logged
> >> >> > records
> >> >> > > >>> are
> >> >> > > >>>>>>>>> replayed on startup.
> >> >> > > >>>>>>>>>>>
> >> >> > > >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still
> >> >> > necessary
> >> >> > > >>> for
> >> >> > > >>>>>>>>> transactional sink commits + any sort of
> reconfiguration
> >> >> (e.g.,
> >> >> > > >>>>> rescaling,
> >> >> > > >>>>>>>>> updating the logic of operators to evolve an
> application
> >> >> etc.).
> >> >> > > >>>>>>>>>>>
> >> >> > > >>>>>>>>>>> I don’t completely understand the “overtaking”
> approach
> >> >> but
> >> >> > if
> >> >> > > you
> >> >> > > >>>>> have
> >> >> > > >>>>>>>>> a concrete definition I would be happy to check it out
> >> and
> >> >> help
> >> >> > > if I
> >> >> > > >>>>> can!
> >> >> > > >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by
> >> logging
> >> >> > > things
> >> >> > > >>> in
> >> >> > > >>>>>>>>> pending channels in a task snapshot before the barrier
> >> >> arrives.
> >> >> > > >>>>>>>>>>>
> >> >> > > >>>>>>>>>>> -Paris
> >> >> > > >>>>>>>>>>>
> >> >> > > >>>>>>>>>>> [1]
> >> >> > > >>>
> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
> >> >> > > >>>>> <
> >> >> > > >>>>>>>>>
> >> >> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm
> >> >> > >
> >> >> > > >>>>>>>>>>>
> >> >> > > >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <
> >> >> > pi...@ververica.com
> >> >> > > >
> >> >> > > >>>>> wrote:
> >> >> > > >>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>> Hi Thomas,
> >> >> > > >>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>> As Zhijiang has responded, we are now in the process
> >> of
> >> >> > > >>> discussing
> >> >> > > >>>>> how
> >> >> > > >>>>>>>>> to address this issue and one of the solution that we
> are
> >> >> > > discussing
> >> >> > > >>>>> is
> >> >> > > >>>>>>>>> exactly what you are proposing: checkpoint barriers
> >> >> overtaking
> >> >> > > the
> >> >> > > >>> in
> >> >> > > >>>>>>>>> flight data and make the in flight data part of the
> >> >> checkpoint.
> >> >> > > >>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>> If everything works well, we will be able to present
> >> >> result
> >> >> > of
> >> >> > > >>> our
> >> >> > > >>>>>>>>> discussions on the dev mailing list soon.
> >> >> > > >>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>> Piotrek
> >> >> > > >>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang <
> >> >> > > wangzhijiang...@aliyun.com
> >> >> > > >>>>> .INVALID>
> >> >> > > >>>>>>>>> wrote:
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> Hi Thomas,
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> Thanks for proposing this concern. The barrier
> >> alignment
> >> >> > > takes
> >> >> > > >>>>> long
> >> >> > > >>>>>>>>> time in backpressure case which could cause several
> >> >> problems:
> >> >> > > >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned.
> >> >> > > >>>>>>>>>>>>> 2. The recovery cost is high once failover, because
> >> much
> >> >> > data
> >> >> > > >>>>> needs
> >> >> > > >>>>>>>>> to be replayed.
> >> >> > > >>>>>>>>>>>>> 3. The delay for commit-based sink is high in
> >> >> exactly-once.
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> For credit-based flow control from release-1.5, the
> >> >> amount
> >> >> > of
> >> >> > > >>>>>>>>> in-flighting buffers before barrier alignment is
> reduced,
> >> >> so we
> >> >> > > >>> could
> >> >> > > >>>>> get a
> >> >> > > >>>>>>>>> bit
> >> >> > > >>>>>>>>>>>>> benefits from speeding checkpoint aspect.
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the
> >> channels
> >> >> > which
> >> >> > > >>>>> already
> >> >> > > >>>>>>>>> received the barrier in practice. But actually we ever
> >> did
> >> >> the
> >> >> > > >>>>> similar thing
> >> >> > > >>>>>>>>>>>>> to speed barrier alighment before. I am not quite
> >> sure
> >> >> that
> >> >> > > >>>>>>>>> release-1.8 covers this feature. There were some
> relevant
> >> >> > > >>> discussions
> >> >> > > >>>>> under
> >> >> > > >>>>>>>>> jira [1].
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> For release-1.10, the community is now discussing
> the
> >> >> > > feature of
> >> >> > > >>>>>>>>> unaligned checkpoint which is mainly for resolving
> above
> >> >> > > concerns.
> >> >> > > >>> The
> >> >> > > >>>>>>>>> basic idea
> >> >> > > >>>>>>>>>>>>> is to make barrier overtakes the output/input
> buffer
> >> >> queue
> >> >> > to
> >> >> > > >>>>> speed
> >> >> > > >>>>>>>>> alignment, and snapshot the input/output buffers as
> part
> >> of
> >> >> > > >>> checkpoint
> >> >> > > >>>>>>>>> state. The
> >> >> > > >>>>>>>>>>>>> details have not confirmed yet and is still under
> >> >> > discussion.
> >> >> > > >>>>> Wish we
> >> >> > > >>>>>>>>> could make some improvments for the release-1.10.
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> [1]
> https://issues.apache.org/jira/browse/FLINK-8523
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> Best,
> >> >> > > >>>>>>>>>>>>> Zhijiang
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>
> >> ------------------------------------------------------------------
> >> >> > > >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org>
> >> >> > > >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38
> >> >> > > >>>>>>>>>>>>> To:dev <dev@flink.apache.org>
> >> >> > > >>>>>>>>>>>>> Subject:Checkpointing under backpressure
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> Hi,
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> One of the major operational difficulties we
> observe
> >> >> with
> >> >> > > Flink
> >> >> > > >>>>> are
> >> >> > > >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking
> >> for
> >> >> > both
> >> >> > > >>>>>>>>> confirmation
> >> >> > > >>>>>>>>>>>>> of my understanding of the current behavior as well
> >> as
> >> >> > > pointers
> >> >> > > >>>>> for
> >> >> > > >>>>>>>>> future
> >> >> > > >>>>>>>>>>>>> improvement work:
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> Prior to introduction of credit based flow control
> in
> >> >> the
> >> >> > > >>> network
> >> >> > > >>>>>>>>> stack [1]
> >> >> > > >>>>>>>>>>>>> [2], checkpoint barriers would back up with the
> data
> >> for
> >> >> > all
> >> >> > > >>>>> logical
> >> >> > > >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5,
> >> the
> >> >> > > buffers
> >> >> > > >>> are
> >> >> > > >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are
> >> only
> >> >> > held
> >> >> > > >>>>> back for
> >> >> > > >>>>>>>>>>>>> channels that have backpressure, while others can
> >> >> continue
> >> >> > > >>>>> processing
> >> >> > > >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot
> >> >> > "overtake
> >> >> > > >>>>> data",
> >> >> > > >>>>>>>>>>>>> therefore checkpoint alignment remains affected for
> >> the
> >> >> > > channel
> >> >> > > >>>>> with
> >> >> > > >>>>>>>>>>>>> backpressure, with the potential for slow
> >> checkpointing
> >> >> and
> >> >> > > >>>>> timeouts.
> >> >> > > >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the
> >> >> maximum
> >> >> > > >>>>> in-transit
> >> >> > > >>>>>>>>>>>>> buffers per channel, resulting in an improvement
> >> >> compared
> >> >> > to
> >> >> > > >>>>> previous
> >> >> > > >>>>>>>>>>>>> versions of Flink. Also, the backpressure based
> >> >> checkpoint
> >> >> > > >>>>> alignment
> >> >> > > >>>>>>>>> can
> >> >> > > >>>>>>>>>>>>> help the barrier advance faster on the receiver
> side
> >> (by
> >> >> > > >>>>> suspending
> >> >> > > >>>>>>>>>>>>> channels that have already delivered the barrier).
> Is
> >> >> that
> >> >> > > >>>>> accurate
> >> >> > > >>>>>>>>> as of
> >> >> > > >>>>>>>>>>>>> Flink 1.8?
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> What appears to be missing to completely unblock
> >> >> > > checkpointing
> >> >> > > >>> is
> >> >> > > >>>>> a
> >> >> > > >>>>>>>>>>>>> mechanism for checkpoints to overtake the data.
> That
> >> >> would
> >> >> > > help
> >> >> > > >>> in
> >> >> > > >>>>>>>>>>>>> situations where the processing itself is the
> >> bottleneck
> >> >> > and
> >> >> > > >>>>>>>>> prioritization
> >> >> > > >>>>>>>>>>>>> in the network stack alone cannot address the
> barrier
> >> >> > delay.
> >> >> > > Was
> >> >> > > >>>>>>>>> there any
> >> >> > > >>>>>>>>>>>>> related discussion? One possible solution would be
> to
> >> >> drain
> >> >> > > >>>>> incoming
> >> >> > > >>>>>>>>> data
> >> >> > > >>>>>>>>>>>>> till the barrier and make it part of the checkpoint
> >> >> instead
> >> >> > > of
> >> >> > > >>>>>>>>> processing
> >> >> > > >>>>>>>>>>>>> it. This is somewhat related to asynchronous
> >> processing,
> >> >> > but
> >> >> > > I'm
> >> >> > > >>>>>>>>> thinking
> >> >> > > >>>>>>>>>>>>> more of a solution that is automated in the Flink
> >> >> runtime
> >> >> > for
> >> >> > > >>> the
> >> >> > > >>>>>>>>>>>>> backpressure scenario only.
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> Thanks,
> >> >> > > >>>>>>>>>>>>> Thomas
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>> [1]
> >> >> > > >>> https://flink.apache.org/2019/06/05/flink-network-stack.html
> >> >> > > >>>>>>>>>>>>> [2]
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>
> >> >> > > >>>
> >> >> > >
> >> >> >
> >> >>
> >>
> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
> >> >> > > >>>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>>
> >> >> > > >>>>>>>>>>>
> >> >> > > >>>>>>>>>>
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>>>
> >> >> > > >>>>>>>
> >> >> > > >>>>>>
> >> >> > > >>>>>
> >> >> > > >>>>>
> >> >> > > >>>
> >> >> > > >>>
> >> >> > > >
> >> >> > >
> >> >> > >
> >> >> >
> >> >> >
> >> >>
> >> >>
> >> >>
> >>
> >
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to