hi guys,If I understand it correctly, will only some checkpoints be recovered when there is an error in the Flink batch?
Piotr Nowojski <pnowoj...@apache.org> 于2022年2月8日周二 19:05写道: > Hi, > > I second Chesnay's comment and would like to better understand the > motivation behind this. At the surface it sounds to me like this might > require quite a bit of work for a very narrow use case. > > At the same time I have a feeling that Yuan, you are mixing this feature > request (checkpointing subgraphs/pipeline regions independently) and a very > very different issue of "task local checkpoints"? Those problems are kind > of similar, but not quite. > > Best, > Piotrek > > wt., 8 lut 2022 o 11:44 Chesnay Schepler <ches...@apache.org> napisał(a): > > > Could someone expand on these operational issues you're facing when > > achieving this via separate jobs? > > > > I feel like we're skipping a step, arguing about solutions without even > > having discussed the underlying problem. > > > > On 08/02/2022 11:25, Gen Luo wrote: > > > Hi, > > > > > > @Yuan > > > Do you mean that there should be no shared state between source > subtasks? > > > Sharing state between checkpoints of a specific subtask should be fine. > > > > > > Sharing state between subtasks of a task can be an issue, no matter > > whether > > > it's a source. That's also what I was afraid of in the previous > replies. > > In > > > one word, if the behavior of a pipeline region can somehow influence > the > > > state of other pipeline regions, their checkpoints have to be aligned > > > before rescaling. > > > > > > On Tue, Feb 8, 2022 at 5:27 PM Yuan Mei <yuanmei.w...@gmail.com> > wrote: > > > > > >> Hey Folks, > > >> > > >> Thanks for the discussion! > > >> > > >> *Motiviation and use cases* > > >> I think motiviation and use cases are very clear and I do not have > > doubts > > >> on this part. > > >> A typical use case is ETL with two-phase-commit, hundreds of > partitions > > can > > >> be blocked by a single straggler (a single task's checkpoint abortion > > can > > >> affect all, not necessary failure). > > >> > > >> *Source offset redistribution* > > >> As for the known sources & implementation for Flink, I can not find a > > case > > >> that does not work, *for now*. > > >> I need to dig a bit more: how splits are tracked assigned, not > > successfully > > >> processed, succesffully processed e.t.c. > > >> I guess it is a single shared source OPCoordinator. And how this > > *shared* > > >> state (between tasks) is preserved? > > >> > > >> *Input partition/splits treated completely independent from each > other* > > >> This part I am still not sure, as mentioned if we have shared state of > > >> source in the above section. > > >> > > >> To Thomas: > > >>> In Yuan's example, is there a reason why CP8 could not be promoted to > > >>> CP10 by the coordinator for PR2 once it receives the notification > that > > >>> CP10 did not complete? It appears that should be possible since in > its > > >>> effect it should be no different than no data processed between CP8 > > >>> and CP10? > > >> Not sure what "promoted" means here, but > > >> 1. I guess it does not matter whether it is CP8 or CP10 any more, > > >> if no shared state in source, as exactly what you meantinoed, > > >> "it should be no different than no data processed between CP8 and > CP10" > > >> > > >> 2. I've noticed that from this question there is a gap between > > >> "*allow aborted/failed checkpoint in independent sub-graph*" and > > >> my intention: "*independent sub-graph checkpointing indepently*" > > >> > > >> Best > > >> Yuan > > >> > > >> > > >> On Tue, Feb 8, 2022 at 11:34 AM Gen Luo <luogen...@gmail.com> wrote: > > >> > > >>> Hi, > > >>> > > >>> I'm thinking about Yuan's case. Let's assume that the case is running > > in > > >>> current Flink: > > >>> 1. CP8 finishes > > >>> 2. For some reason, PR2 stops consuming records from the source (but > is > > >> not > > >>> stuck), and PR1 continues consuming new records. > > >>> 3. CP9 and CP10 finish > > >>> 4. PR2 starts to consume quickly to catch up with PR1, and reaches > the > > >> same > > >>> final status with that in Yuan's case before CP11 starts. > > >>> > > >>> I support that in this case, the status of the job can be the same as > > in > > >>> Yuan's case, and the snapshot (including source states) taken at CP10 > > >>> should be the same as the composed global snapshot in Yuan's case, > > which > > >> is > > >>> combining CP10 of PR1 and CP8 of PR2. This should be true if neither > > >> failed > > >>> checkpointing nor uncommitted consuming have side effects, both of > > which > > >>> can break the exactly-once semantics when replaying. So I think there > > >>> should be no difference between rescaling the combined global > snapshot > > or > > >>> the globally taken one, i.e. if the input partitions are not > > independent, > > >>> we are probably not able to rescale the source state in the current > > Flink > > >>> eiter. > > >>> > > >>> And @Thomas, I do agree that the operational burden is > > >>> significantly reduced, while I'm a little afraid that checkpointing > the > > >>> subgraphs individually may increase most of the runtime overhead back > > >>> again. Maybe we can find a better way to implement this. > > >>> > > >>> On Tue, Feb 8, 2022 at 5:11 AM Thomas Weise <t...@apache.org> wrote: > > >>> > > >>>> Hi, > > >>>> > > >>>> Thanks for opening this discussion! The proposed enhancement would > be > > >>>> interesting for use cases in our infrastructure as well. > > >>>> > > >>>> There are scenarios where it makes sense to have multiple > disconnected > > >>>> subgraphs in a single job because it can significantly reduce the > > >>>> operational burden as well as the runtime overhead. Since we allow > > >>>> subgraphs to recover independently, then why not allow them to make > > >>>> progress independently also, which would imply that checkpointing > must > > >>>> succeed for non affected subgraphs as certain behavior is tied to > > >>>> checkpoint completion, like Kafka offset commit, file output etc. > > >>>> > > >>>> As for source offset redistribution, offset/position needs to be > tied > > >>>> to splits (in FLIP-27) and legacy sources. (It applies to both Kafka > > >>>> and Kinesis legacy sources and FLIP-27 Kafka source.). With the new > > >>>> source framework, it would be hard to implement a source with > correct > > >>>> behavior that does not track the position along with the split. > > >>>> > > >>>> In Yuan's example, is there a reason why CP8 could not be promoted > to > > >>>> CP10 by the coordinator for PR2 once it receives the notification > that > > >>>> CP10 did not complete? It appears that should be possible since in > its > > >>>> effect it should be no different than no data processed between CP8 > > >>>> and CP10? > > >>>> > > >>>> Thanks, > > >>>> Thomas > > >>>> > > >>>> On Mon, Feb 7, 2022 at 2:36 AM Till Rohrmann <trohrm...@apache.org> > > >>> wrote: > > >>>>> Thanks for the clarification Yuan and Gen, > > >>>>> > > >>>>> I agree that the checkpointing of the sources needs to support the > > >>>>> rescaling case, otherwise it does not work. Is there currently a > > >> source > > >>>>> implementation where this wouldn't work? For Kafka it should work > > >>> because > > >>>>> we store the offset per assigned partition. For Kinesis it is > > >> probably > > >>>> the > > >>>>> same. For the Filesource we store the set of unread input splits in > > >> the > > >>>>> source coordinator and the state of the assigned splits in the > > >> sources. > > >>>>> This should probably also work since new splits are only handed out > > >> to > > >>>>> running tasks. > > >>>>> > > >>>>> Cheers, > > >>>>> Till > > >>>>> > > >>>>> On Mon, Feb 7, 2022 at 10:29 AM Yuan Mei <yuanmei.w...@gmail.com> > > >>> wrote: > > >>>>>> Hey Till, > > >>>>>> > > >>>>>>> Why rescaling is a problem for pipelined regions/independent > > >>>> execution > > >>>>>> subgraphs: > > >>>>>> > > >>>>>> Take a simplified example : > > >>>>>> job graph : source (2 instances) -> sink (2 instances) > > >>>>>> execution graph: > > >>>>>> source (1/2) -> sink (1/2) [pieplined region 1] > > >>>>>> source (2/2) -> sink (2/2) [pieplined region 2] > > >>>>>> > > >>>>>> Let's assume checkpoints are still triggered globally, meaning > > >>>> different > > >>>>>> pipelined regions share the global checkpoint id (PR1 CP1 matches > > >>> with > > >>>> PR2 > > >>>>>> CP1). > > >>>>>> > > >>>>>> Now let's assume PR1 completes CP10 and PR2 completes CP8. > > >>>>>> > > >>>>>> Let's say we want to rescale to parallelism 3 due to increased > > >> input. > > >>>>>> - Notice that we can not simply rescale based on the latest > > >> completed > > >>>>>> checkpoint (CP8), because PR1 has already had data (CP8 -> CP10) > > >>> output > > >>>>>> externally. > > >>>>>> - Can we take CP10 from PR1 and CP8 from PR2? I think it depends > on > > >>>> how the > > >>>>>> source's offset redistribution is implemented. > > >>>>>> The answer is yes if we treat each input partition as > > >> independent > > >>>> from > > >>>>>> each other, *but I am not sure whether we can make that > > >> assumption*. > > >>>>>> If not, the rescaling cannot happen until PR1 and PR2 are aligned > > >>> with > > >>>> CPs. > > >>>>>> Best > > >>>>>> -Yuan > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On Mon, Feb 7, 2022 at 4:17 PM Till Rohrmann < > trohrm...@apache.org > > >>>> wrote: > > >>>>>>> Hi everyone, > > >>>>>>> > > >>>>>>> Yuan and Gen could you elaborate why rescaling is a problem if we > > >>> say > > >>>>>> that > > >>>>>>> separate pipelined regions can take checkpoints independently? > > >>>>>>> Conceptually, I somehow think that a pipelined region that is > > >>> failed > > >>>> and > > >>>>>>> cannot create a new checkpoint is more or less the same as a > > >>>> pipelined > > >>>>>>> region that didn't get new input or a very very slow pipelined > > >>> region > > >>>>>> which > > >>>>>>> couldn't read new records since the last checkpoint (assuming > > >> that > > >>>> the > > >>>>>>> checkpoint coordinator can create a global checkpoint by > > >> combining > > >>>>>>> individual checkpoints (e.g. taking the last completed checkpoint > > >>>> from > > >>>>>> each > > >>>>>>> pipelined region)). If this comparison is correct, then this > > >> would > > >>>> mean > > >>>>>>> that we have rescaling problems under the latter two cases. > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> Till > > >>>>>>> > > >>>>>>> On Mon, Feb 7, 2022 at 8:55 AM Gen Luo <luogen...@gmail.com> > > >>> wrote: > > >>>>>>>> Hi Gyula, > > >>>>>>>> > > >>>>>>>> Thanks for sharing the idea. As Yuan mentioned, I think we can > > >>>> discuss > > >>>>>>> this > > >>>>>>>> within two scopes. One is the job subgraph, the other is the > > >>>> execution > > >>>>>>>> subgraph, which I suppose is the same as PipelineRegion. > > >>>>>>>> > > >>>>>>>> An idea is to individually checkpoint the PipelineRegions, for > > >>> the > > >>>>>>>> recovering in a single run. > > >>>>>>>> > > >>>>>>>> Flink has now supported PipelineRegion based failover, with a > > >>>> subset > > >>>>>> of a > > >>>>>>>> global checkpoint snapshot. The checkpoint barriers are spread > > >>>> within a > > >>>>>>>> PipelineRegion, so the checkpointing of individual > > >>> PipelineRegions > > >>>> is > > >>>>>>>> actually independent. Since in a single run of a job, the > > >>>>>> PipelineRegions > > >>>>>>>> are fixed, we can individually checkpoint separated > > >>>> PipelineRegions, > > >>>>>>>> despite what status the other PipelineRegions are, and use a > > >>>> snapshot > > >>>>>> of > > >>>>>>> a > > >>>>>>>> failing region to recover, instead of the subset of a global > > >>>> snapshot. > > >>>>>>> This > > >>>>>>>> can support separated job subgraphs as well, since they will > > >> also > > >>>> be > > >>>>>>>> separated into different PipelineRegions. I think this can > > >>> fulfill > > >>>> your > > >>>>>>>> needs. > > >>>>>>>> > > >>>>>>>> In fact the individual snapshots of all PipelineRegions can > > >> form > > >>> a > > >>>>>> global > > >>>>>>>> snapshot, and the alignment of snapshots of individual regions > > >> is > > >>>> not > > >>>>>>>> necessary. But rescaling this global snapshot can be > > >> potentially > > >>>>>>> complex. I > > >>>>>>>> think it's better to use the individual snapshots in a single > > >>> run, > > >>>> and > > >>>>>>> take > > >>>>>>>> a global checkpoint/savepoint before restarting the job, > > >>> rescaling > > >>>> it > > >>>>>> or > > >>>>>>>> not. > > >>>>>>>> > > >>>>>>>> A major issue of this plan is that it breaks the checkpoint > > >>>> mechanism > > >>>>>> of > > >>>>>>>> Flink. As far as I know, even in the approximate recovery, the > > >>>> snapshot > > >>>>>>>> used to recover a single task is still a part of a global > > >>>> snapshot. To > > >>>>>>>> implement the individual checkpointing of PipelineRegions, > > >> there > > >>>> may > > >>>>>> need > > >>>>>>>> to be a checkpoint coordinator for each PipelineRegion, and a > > >> new > > >>>>>> global > > >>>>>>>> checkpoint coordinator. When the scale goes up, there can be > > >> many > > >>>>>>>> individual regions, which can be a big burden to the job > > >> manager. > > >>>> The > > >>>>>>>> meaning of the checkpoint id will also be changed, which can > > >>> affect > > >>>>>> many > > >>>>>>>> aspects. There can be lots of work and risks, and the risks > > >> still > > >>>> exist > > >>>>>>> if > > >>>>>>>> we only individually checkpoint separated job subgraphs, since > > >>> the > > >>>>>>>> mechanism is still broken. If that is what you need, maybe > > >>>> separating > > >>>>>>> them > > >>>>>>>> into different jobs is an easier and better choice, as Caizhi > > >> and > > >>>> Yuan > > >>>>>>>> mentioned. > > >>>>>>>> > > >>>>>>>> On Mon, Feb 7, 2022 at 11:39 AM Yuan Mei < > > >> yuanmei.w...@gmail.com > > >>>>>> wrote: > > >>>>>>>>> Hey Gyula, > > >>>>>>>>> > > >>>>>>>>> That's a very interesting idea. The discussion about the > > >>>> `Individual` > > >>>>>>> vs > > >>>>>>>>> `Global` checkpoint was raised before, but the main concern > > >> was > > >>>> from > > >>>>>>> two > > >>>>>>>>> aspects: > > >>>>>>>>> > > >>>>>>>>> - Non-deterministic replaying may lead to an inconsistent > > >> view > > >>> of > > >>>>>>>>> checkpoint > > >>>>>>>>> - It is not easy to form a clear cut of past and future and > > >>>> hence no > > >>>>>>>> clear > > >>>>>>>>> cut of where the start point of the source should begin to > > >>> replay > > >>>>>> from. > > >>>>>>>>> Starting from independent subgraphs as you proposed may be a > > >>> good > > >>>>>>>> starting > > >>>>>>>>> point. However, when we talk about subgraph, do we mention it > > >>> as > > >>>> a > > >>>>>> job > > >>>>>>>>> subgraph (each vertex is one or more operators) or execution > > >>>> subgraph > > >>>>>>>> (each > > >>>>>>>>> vertex is a task instance)? > > >>>>>>>>> > > >>>>>>>>> If it is a job subgraph, then indeed, why not separate it > > >> into > > >>>>>> multiple > > >>>>>>>>> jobs as Caizhi mentioned. > > >>>>>>>>> If it is an execution subgraph, then it is difficult to > > >> handle > > >>>>>>> rescaling > > >>>>>>>>> due to inconsistent views of checkpoints between tasks of the > > >>>> same > > >>>>>>>>> operator. > > >>>>>>>>> > > >>>>>>>>> `Individual/Subgraph Checkpointing` is definitely an > > >>> interesting > > >>>>>>>> direction > > >>>>>>>>> to think of, and I'd love to hear more from you! > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> > > >>>>>>>>> Yuan > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Mon, Feb 7, 2022 at 10:16 AM Caizhi Weng < > > >>>> tsreape...@gmail.com> > > >>>>>>>> wrote: > > >>>>>>>>>> Hi Gyula! > > >>>>>>>>>> > > >>>>>>>>>> Thanks for raising this discussion. I agree that this will > > >> be > > >>>> an > > >>>>>>>>>> interesting feature but I actually have some doubts about > > >> the > > >>>>>>>> motivation > > >>>>>>>>>> and use case. If there are multiple individual subgraphs in > > >>> the > > >>>>>> same > > >>>>>>>> job, > > >>>>>>>>>> why not just distribute them to multiple jobs so that each > > >>> job > > >>>>>>> contains > > >>>>>>>>>> only one individual graph and can now fail without > > >> disturbing > > >>>> the > > >>>>>>>> others? > > >>>>>>>>>> > > >>>>>>>>>> Gyula Fóra <gyf...@apache.org> 于2022年2月7日周一 05:22写道: > > >>>>>>>>>> > > >>>>>>>>>>> Hi all! > > >>>>>>>>>>> > > >>>>>>>>>>> At the moment checkpointing only works for healthy jobs > > >>> with > > >>>> all > > >>>>>>>>> running > > >>>>>>>>>>> (or some finished) tasks. This sounds reasonable in most > > >>>> cases > > >>>>>> but > > >>>>>>>>> there > > >>>>>>>>>>> are a few applications where it would make sense to > > >>>> checkpoint > > >>>>>>>> failing > > >>>>>>>>>> jobs > > >>>>>>>>>>> as well. > > >>>>>>>>>>> > > >>>>>>>>>>> Due to how the checkpointing mechanism works, subgraphs > > >>> that > > >>>>>> have a > > >>>>>>>>>> failing > > >>>>>>>>>>> task cannot be checkpointed without violating the > > >>>> exactly-once > > >>>>>>>>> semantics. > > >>>>>>>>>>> However if the job has multiple independent subgraphs > > >> (that > > >>>> are > > >>>>>> not > > >>>>>>>>>>> connected to each other), even if one subgraph is > > >> failing, > > >>>> the > > >>>>>>> other > > >>>>>>>>>>> completely running one could be checkpointed. In these > > >>> cases > > >>>> the > > >>>>>>>> tasks > > >>>>>>>>> of > > >>>>>>>>>>> the failing subgraph could simply inherit the last > > >>> successful > > >>>>>>>>> checkpoint > > >>>>>>>>>>> metadata (before they started failing). This logic would > > >>>> produce > > >>>>>> a > > >>>>>>>>>>> consistent checkpoint. > > >>>>>>>>>>> > > >>>>>>>>>>> The job as a whole could now make stateful progress even > > >> if > > >>>> some > > >>>>>>>>>> subgraphs > > >>>>>>>>>>> are constantly failing. This can be very valuable if for > > >>> some > > >>>>>>> reason > > >>>>>>>>> the > > >>>>>>>>>>> job has a larger number of independent subgraphs that are > > >>>>>> expected > > >>>>>>> to > > >>>>>>>>>> fail > > >>>>>>>>>>> every once in a while, or if some subgraphs can have > > >> longer > > >>>>>>> downtimes > > >>>>>>>>>> that > > >>>>>>>>>>> would now cause the whole job to stall. > > >>>>>>>>>>> > > >>>>>>>>>>> What do you think? > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> Gyula > > >>>>>>>>>>> > > > > >