hi guys,If I understand it correctly, will only some checkpoints be
recovered when there is an error in the Flink batch?

Piotr Nowojski <pnowoj...@apache.org> 于2022年2月8日周二 19:05写道:

> Hi,
>
> I second Chesnay's comment and would like to better understand the
> motivation behind this. At the surface it sounds to me like this might
> require quite a bit of work for a very narrow use case.
>
> At the same time I have a feeling that Yuan, you are mixing this feature
> request (checkpointing subgraphs/pipeline regions independently) and a very
> very different issue of "task local checkpoints"? Those problems are kind
> of similar, but not quite.
>
> Best,
> Piotrek
>
> wt., 8 lut 2022 o 11:44 Chesnay Schepler <ches...@apache.org> napisał(a):
>
> > Could someone expand on these operational issues you're facing when
> > achieving this via separate jobs?
> >
> > I feel like we're skipping a step, arguing about solutions without even
> > having discussed the underlying problem.
> >
> > On 08/02/2022 11:25, Gen Luo wrote:
> > > Hi,
> > >
> > > @Yuan
> > > Do you mean that there should be no shared state between source
> subtasks?
> > > Sharing state between checkpoints of a specific subtask should be fine.
> > >
> > > Sharing state between subtasks of a task can be an issue, no matter
> > whether
> > > it's a source. That's also what I was afraid of in the previous
> replies.
> > In
> > > one word, if the behavior of a pipeline region can somehow influence
> the
> > > state of other pipeline regions, their checkpoints have to be aligned
> > > before rescaling.
> > >
> > > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yuanmei.w...@gmail.com>
> wrote:
> > >
> > >> Hey Folks,
> > >>
> > >> Thanks for the discussion!
> > >>
> > >> *Motiviation and use cases*
> > >> I think motiviation and use cases are very clear and I do not have
> > doubts
> > >> on this part.
> > >> A typical use case is ETL with two-phase-commit, hundreds of
> partitions
> > can
> > >> be blocked by a single straggler (a single task's checkpoint abortion
> > can
> > >> affect all, not necessary failure).
> > >>
> > >> *Source offset redistribution*
> > >> As for the known sources & implementation for Flink, I can not find a
> > case
> > >> that does not work, *for now*.
> > >> I need to dig a bit more: how splits are tracked assigned, not
> > successfully
> > >> processed, succesffully processed e.t.c.
> > >> I guess it is a single shared source OPCoordinator. And how this
> > *shared*
> > >> state (between tasks) is preserved?
> > >>
> > >> *Input partition/splits treated completely independent from each
> other*
> > >> This part I am still not sure, as mentioned if we have shared state of
> > >> source in the above section.
> > >>
> > >> To Thomas:
> > >>> In Yuan's example, is there a reason why CP8 could not be promoted to
> > >>> CP10 by the coordinator for PR2 once it receives the notification
> that
> > >>> CP10 did not complete? It appears that should be possible since in
> its
> > >>> effect it should be no different than no data processed between CP8
> > >>>   and CP10?
> > >> Not sure what "promoted" means here, but
> > >> 1. I guess it does not matter whether it is CP8 or CP10 any more,
> > >> if no shared state in source, as exactly what you meantinoed,
> > >> "it should be no different than no data processed between CP8 and
> CP10"
> > >>
> > >> 2. I've noticed that from this question there is a gap between
> > >> "*allow aborted/failed checkpoint in independent sub-graph*" and
> > >> my intention: "*independent sub-graph checkpointing indepently*"
> > >>
> > >> Best
> > >> Yuan
> > >>
> > >>
> > >> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <luogen...@gmail.com> wrote:
> > >>
> > >>> Hi,
> > >>>
> > >>> I'm thinking about Yuan's case. Let's assume that the case is running
> > in
> > >>> current Flink:
> > >>> 1. CP8 finishes
> > >>> 2. For some reason, PR2 stops consuming records from the source (but
> is
> > >> not
> > >>> stuck), and PR1 continues consuming new records.
> > >>> 3. CP9 and CP10 finish
> > >>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches
> the
> > >> same
> > >>> final status with that in Yuan's case before CP11 starts.
> > >>>
> > >>> I support that in this case, the status of the job can be the same as
> > in
> > >>> Yuan's case, and the snapshot (including source states) taken at CP10
> > >>> should be the same as the composed global snapshot in Yuan's case,
> > which
> > >> is
> > >>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither
> > >> failed
> > >>> checkpointing nor uncommitted consuming have side effects, both of
> > which
> > >>> can break the exactly-once semantics when replaying. So I think there
> > >>> should be no difference between rescaling the combined global
> snapshot
> > or
> > >>> the globally taken one, i.e. if the input partitions are not
> > independent,
> > >>> we are probably not able to rescale the source state in the current
> > Flink
> > >>> eiter.
> > >>>
> > >>> And @Thomas, I do agree that the operational burden is
> > >>> significantly reduced, while I'm a little afraid that checkpointing
> the
> > >>> subgraphs individually may increase most of the runtime overhead back
> > >>> again. Maybe we can find a better way to implement this.
> > >>>
> > >>> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <t...@apache.org> wrote:
> > >>>
> > >>>> Hi,
> > >>>>
> > >>>> Thanks for opening this discussion! The proposed enhancement would
> be
> > >>>> interesting for use cases in our infrastructure as well.
> > >>>>
> > >>>> There are scenarios where it makes sense to have multiple
> disconnected
> > >>>> subgraphs in a single job because it can significantly reduce the
> > >>>> operational burden as well as the runtime overhead. Since we allow
> > >>>> subgraphs to recover independently, then why not allow them to make
> > >>>> progress independently also, which would imply that checkpointing
> must
> > >>>> succeed for non affected subgraphs as certain behavior is tied to
> > >>>> checkpoint completion, like Kafka offset commit, file output etc.
> > >>>>
> > >>>> As for source offset redistribution, offset/position needs to be
> tied
> > >>>> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka
> > >>>> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new
> > >>>> source framework, it would be hard to implement a source with
> correct
> > >>>> behavior that does not track the position along with the split.
> > >>>>
> > >>>> In Yuan's example, is there a reason why CP8 could not be promoted
> to
> > >>>> CP10 by the coordinator for PR2 once it receives the notification
> that
> > >>>> CP10 did not complete? It appears that should be possible since in
> its
> > >>>> effect it should be no different than no data processed between CP8
> > >>>> and CP10?
> > >>>>
> > >>>> Thanks,
> > >>>> Thomas
> > >>>>
> > >>>> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <trohrm...@apache.org>
> > >>> wrote:
> > >>>>> Thanks for the clarification Yuan and Gen,
> > >>>>>
> > >>>>> I agree that the checkpointing of the sources needs to support the
> > >>>>> rescaling case, otherwise it does not work. Is there currently a
> > >> source
> > >>>>> implementation where this wouldn't work? For Kafka it should work
> > >>> because
> > >>>>> we store the offset per assigned partition. For Kinesis it is
> > >> probably
> > >>>> the
> > >>>>> same. For the Filesource we store the set of unread input splits in
> > >> the
> > >>>>> source coordinator and the state of the assigned splits in the
> > >> sources.
> > >>>>> This should probably also work since new splits are only handed out
> > >> to
> > >>>>> running tasks.
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Till
> > >>>>>
> > >>>>> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yuanmei.w...@gmail.com>
> > >>> wrote:
> > >>>>>> Hey Till,
> > >>>>>>
> > >>>>>>> Why rescaling is a problem for pipelined regions/independent
> > >>>> execution
> > >>>>>> subgraphs:
> > >>>>>>
> > >>>>>> Take a simplified example :
> > >>>>>> job graph : source  (2 instances) -> sink (2 instances)
> > >>>>>> execution graph:
> > >>>>>> source (1/2)  -> sink (1/2)   [pieplined region 1]
> > >>>>>> source (2/2)  -> sink (2/2)   [pieplined region 2]
> > >>>>>>
> > >>>>>> Let's assume checkpoints are still triggered globally, meaning
> > >>>> different
> > >>>>>> pipelined regions share the global checkpoint id (PR1 CP1 matches
> > >>> with
> > >>>> PR2
> > >>>>>> CP1).
> > >>>>>>
> > >>>>>> Now let's assume PR1 completes CP10 and PR2 completes CP8.
> > >>>>>>
> > >>>>>> Let's say we want to rescale to parallelism 3 due to increased
> > >> input.
> > >>>>>> - Notice that we can not simply rescale based on the latest
> > >> completed
> > >>>>>> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10)
> > >>> output
> > >>>>>> externally.
> > >>>>>> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends
> on
> > >>>> how the
> > >>>>>> source's offset redistribution is implemented.
> > >>>>>>     The answer is yes if we treat each input partition as
> > >> independent
> > >>>> from
> > >>>>>> each other, *but I am not sure whether we can make that
> > >> assumption*.
> > >>>>>> If not, the rescaling cannot happen until PR1 and PR2 are aligned
> > >>> with
> > >>>> CPs.
> > >>>>>> Best
> > >>>>>> -Yuan
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann <
> trohrm...@apache.org
> > >>>> wrote:
> > >>>>>>> Hi everyone,
> > >>>>>>>
> > >>>>>>> Yuan and Gen could you elaborate why rescaling is a problem if we
> > >>> say
> > >>>>>> that
> > >>>>>>> separate pipelined regions can take checkpoints independently?
> > >>>>>>> Conceptually, I somehow think that a pipelined region that is
> > >>> failed
> > >>>> and
> > >>>>>>> cannot create a new checkpoint is more or less the same as a
> > >>>> pipelined
> > >>>>>>> region that didn't get new input or a very very slow pipelined
> > >>> region
> > >>>>>> which
> > >>>>>>> couldn't read new records since the last checkpoint (assuming
> > >> that
> > >>>> the
> > >>>>>>> checkpoint coordinator can create a global checkpoint by
> > >> combining
> > >>>>>>> individual checkpoints (e.g. taking the last completed checkpoint
> > >>>> from
> > >>>>>> each
> > >>>>>>> pipelined region)). If this comparison is correct, then this
> > >> would
> > >>>> mean
> > >>>>>>> that we have rescaling problems under the latter two cases.
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Till
> > >>>>>>>
> > >>>>>>> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <luogen...@gmail.com>
> > >>> wrote:
> > >>>>>>>> Hi Gyula,
> > >>>>>>>>
> > >>>>>>>> Thanks for sharing the idea. As Yuan mentioned, I think we can
> > >>>> discuss
> > >>>>>>> this
> > >>>>>>>> within two scopes. One is the job subgraph, the other is the
> > >>>> execution
> > >>>>>>>> subgraph, which I suppose is the same as PipelineRegion.
> > >>>>>>>>
> > >>>>>>>> An idea is to individually checkpoint the PipelineRegions, for
> > >>> the
> > >>>>>>>> recovering in a single run.
> > >>>>>>>>
> > >>>>>>>> Flink has now supported PipelineRegion based failover, with a
> > >>>> subset
> > >>>>>> of a
> > >>>>>>>> global checkpoint snapshot. The checkpoint barriers are spread
> > >>>> within a
> > >>>>>>>> PipelineRegion, so the checkpointing of individual
> > >>> PipelineRegions
> > >>>> is
> > >>>>>>>> actually independent. Since in a single run of a job, the
> > >>>>>> PipelineRegions
> > >>>>>>>> are fixed, we can individually checkpoint separated
> > >>>> PipelineRegions,
> > >>>>>>>> despite what status the other PipelineRegions are, and use a
> > >>>> snapshot
> > >>>>>> of
> > >>>>>>> a
> > >>>>>>>> failing region to recover, instead of the subset of a global
> > >>>> snapshot.
> > >>>>>>> This
> > >>>>>>>> can support separated job subgraphs as well, since they will
> > >> also
> > >>>> be
> > >>>>>>>> separated into different PipelineRegions. I think this can
> > >>> fulfill
> > >>>> your
> > >>>>>>>> needs.
> > >>>>>>>>
> > >>>>>>>> In fact the individual snapshots of all PipelineRegions can
> > >> form
> > >>> a
> > >>>>>> global
> > >>>>>>>> snapshot, and the alignment of snapshots of individual regions
> > >> is
> > >>>> not
> > >>>>>>>> necessary. But rescaling this global snapshot can be
> > >> potentially
> > >>>>>>> complex. I
> > >>>>>>>> think it's better to use the individual snapshots in a single
> > >>> run,
> > >>>> and
> > >>>>>>> take
> > >>>>>>>> a global checkpoint/savepoint before restarting the job,
> > >>> rescaling
> > >>>> it
> > >>>>>> or
> > >>>>>>>> not.
> > >>>>>>>>
> > >>>>>>>> A major issue of this plan is that it breaks the checkpoint
> > >>>> mechanism
> > >>>>>> of
> > >>>>>>>> Flink. As far as I know, even in the approximate recovery, the
> > >>>> snapshot
> > >>>>>>>> used to recover a single task is still a part of a global
> > >>>> snapshot. To
> > >>>>>>>> implement the individual checkpointing of PipelineRegions,
> > >> there
> > >>>> may
> > >>>>>> need
> > >>>>>>>> to be a checkpoint coordinator for each PipelineRegion, and a
> > >> new
> > >>>>>> global
> > >>>>>>>> checkpoint coordinator. When the scale goes up, there can be
> > >> many
> > >>>>>>>> individual regions, which can be a big burden to the job
> > >> manager.
> > >>>> The
> > >>>>>>>> meaning of the checkpoint id will also be changed, which can
> > >>> affect
> > >>>>>> many
> > >>>>>>>> aspects. There can be lots of work and risks, and the risks
> > >> still
> > >>>> exist
> > >>>>>>> if
> > >>>>>>>> we only individually checkpoint separated job subgraphs, since
> > >>> the
> > >>>>>>>> mechanism is still broken. If that is what you need, maybe
> > >>>> separating
> > >>>>>>> them
> > >>>>>>>> into different jobs is an easier and better choice, as Caizhi
> > >> and
> > >>>> Yuan
> > >>>>>>>> mentioned.
> > >>>>>>>>
> > >>>>>>>> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei <
> > >> yuanmei.w...@gmail.com
> > >>>>>> wrote:
> > >>>>>>>>> Hey Gyula,
> > >>>>>>>>>
> > >>>>>>>>> That's a very interesting idea. The discussion about the
> > >>>> `Individual`
> > >>>>>>> vs
> > >>>>>>>>> `Global` checkpoint was raised before, but the main concern
> > >> was
> > >>>> from
> > >>>>>>> two
> > >>>>>>>>> aspects:
> > >>>>>>>>>
> > >>>>>>>>> - Non-deterministic replaying may lead to an inconsistent
> > >> view
> > >>> of
> > >>>>>>>>> checkpoint
> > >>>>>>>>> - It is not easy to form a clear cut of past and future and
> > >>>> hence no
> > >>>>>>>> clear
> > >>>>>>>>> cut of where the start point of the source should begin to
> > >>> replay
> > >>>>>> from.
> > >>>>>>>>> Starting from independent subgraphs as you proposed may be a
> > >>> good
> > >>>>>>>> starting
> > >>>>>>>>> point. However, when we talk about subgraph, do we mention it
> > >>> as
> > >>>> a
> > >>>>>> job
> > >>>>>>>>> subgraph (each vertex is one or more operators) or execution
> > >>>> subgraph
> > >>>>>>>> (each
> > >>>>>>>>> vertex is a task instance)?
> > >>>>>>>>>
> > >>>>>>>>> If it is a job subgraph, then indeed, why not separate it
> > >> into
> > >>>>>> multiple
> > >>>>>>>>> jobs as Caizhi mentioned.
> > >>>>>>>>> If it is an execution subgraph, then it is difficult to
> > >> handle
> > >>>>>>> rescaling
> > >>>>>>>>> due to inconsistent views of checkpoints between tasks of the
> > >>>> same
> > >>>>>>>>> operator.
> > >>>>>>>>>
> > >>>>>>>>> `Individual/Subgraph Checkpointing` is definitely an
> > >>> interesting
> > >>>>>>>> direction
> > >>>>>>>>> to think of, and I'd love to hear more from you!
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>>
> > >>>>>>>>> Yuan
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng <
> > >>>> tsreape...@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>>> Hi Gyula!
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks for raising this discussion. I agree that this will
> > >> be
> > >>>> an
> > >>>>>>>>>> interesting feature but I actually have some doubts about
> > >> the
> > >>>>>>>> motivation
> > >>>>>>>>>> and use case. If there are multiple individual subgraphs in
> > >>> the
> > >>>>>> same
> > >>>>>>>> job,
> > >>>>>>>>>> why not just distribute them to multiple jobs so that each
> > >>> job
> > >>>>>>> contains
> > >>>>>>>>>> only one individual graph and can now fail without
> > >> disturbing
> > >>>> the
> > >>>>>>>> others?
> > >>>>>>>>>>
> > >>>>>>>>>> Gyula Fóra <gyf...@apache.org> 于2022年2月7日周一 05:22写道:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hi all!
> > >>>>>>>>>>>
> > >>>>>>>>>>> At the moment checkpointing only works for healthy jobs
> > >>> with
> > >>>> all
> > >>>>>>>>> running
> > >>>>>>>>>>> (or some finished) tasks. This sounds reasonable in most
> > >>>> cases
> > >>>>>> but
> > >>>>>>>>> there
> > >>>>>>>>>>> are a few applications where it would make sense to
> > >>>> checkpoint
> > >>>>>>>> failing
> > >>>>>>>>>> jobs
> > >>>>>>>>>>> as well.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Due to how the checkpointing mechanism works, subgraphs
> > >>> that
> > >>>>>> have a
> > >>>>>>>>>> failing
> > >>>>>>>>>>> task cannot be checkpointed without violating the
> > >>>> exactly-once
> > >>>>>>>>> semantics.
> > >>>>>>>>>>> However if the job has multiple independent subgraphs
> > >> (that
> > >>>> are
> > >>>>>> not
> > >>>>>>>>>>> connected to each other), even if one subgraph is
> > >> failing,
> > >>>> the
> > >>>>>>> other
> > >>>>>>>>>>> completely running one could be checkpointed. In these
> > >>> cases
> > >>>> the
> > >>>>>>>> tasks
> > >>>>>>>>> of
> > >>>>>>>>>>> the failing subgraph could simply inherit the last
> > >>> successful
> > >>>>>>>>> checkpoint
> > >>>>>>>>>>> metadata (before they started failing). This logic would
> > >>>> produce
> > >>>>>> a
> > >>>>>>>>>>> consistent checkpoint.
> > >>>>>>>>>>>
> > >>>>>>>>>>> The job as a whole could now make stateful progress even
> > >> if
> > >>>> some
> > >>>>>>>>>> subgraphs
> > >>>>>>>>>>> are constantly failing. This can be very valuable if for
> > >>> some
> > >>>>>>> reason
> > >>>>>>>>> the
> > >>>>>>>>>>> job has a larger number of independent subgraphs that are
> > >>>>>> expected
> > >>>>>>> to
> > >>>>>>>>>> fail
> > >>>>>>>>>>> every once in a while, or if some subgraphs can have
> > >> longer
> > >>>>>>> downtimes
> > >>>>>>>>>> that
> > >>>>>>>>>>> would now cause the whole job to stall.
> > >>>>>>>>>>>
> > >>>>>>>>>>> What do you think?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Cheers,
> > >>>>>>>>>>> Gyula
> > >>>>>>>>>>>
> >
> >
>

Reply via email to