Thanks for the explanation Yun and Guowei. I have to admit that I do not fully understand why this is strictly required but I think that we are touching two very important aspects which might have far fetching consequences for how Flink works:
1) Do we want to allow that multiple checkpoints are required to materialize results? 2) Do we want to allow to emit records in notifyCheckpointComplete? For 1) I am not sure whether this has been discussed within the community sufficiently. Requiring multiple checkpoints to materialize a result because of multi level committers has the consequence that we increase the latency from checkpoint interval to #levels * checkpoint interval. Moreover, having to drain the pipeline in multiple steps, would break the stop-with-savepoint --drain because which savepoint do you report to the user? For 2) allowing to send records after the final notifyCheckpointComplete will effectively mean that we need to shut down a topology in multiple steps (in the worst case one operator per checkpoint). This would be a strong argument for not allowing this to me. The fact that users can send records after the notifyCheckpointComplete is more by accident than by design. I think we should make this a very deliberate decision and in doubt I would be in favour of a more restrictive model unless there is a very good reason why this should be supported. Taking also the discussion in FLINK-21133 [1] into account, it seems to me that we haven't really understood what kind of guarantees we want to give to our users and how the final checkpoint should exactly work. I understand that this is not included in the first scope of FLIP-147 but I think this is so important that we should figure this out asap. Also because the exact shut down behaviour will have to be aligned with the lifecycle of a Task/StreamTask/StreamOperator. And last but not least because other features such as the new sink API start building upon a shut down model which has not been fully understood/agreed upon. [1] https://issues.apache.org/jira/browse/FLINK-21133 Cheers, Till On Tue, Feb 16, 2021 at 9:45 AM Guowei Ma <guowei....@gmail.com> wrote: > Thanks Yun for the detailed explanation. > A simple supplementary explanation about the sink case: Maybe we could use > `OperatorCoordinator` to avoid sending the element to the downstream > operator. > But I agree we could not limit the users not to emit records in the > `notiyCheckpointComplete`. > > Best, > Guowei > > > On Tue, Feb 16, 2021 at 2:06 PM Yun Gao <yungao...@aliyun.com.invalid> > wrote: > > > Hi all, > > > > I'd like to first detail the issue with emitting records in > > notifyCheckpointComplete for context. For specific usage, > > an example would be for sink, it might want to write some metadata after > > all the transactions are committed > > (like write a marker file _SUCCESS to the output directory). This case is > > currently supported via the two level > > committers of the new sink API: when received endOfInput(), the Committer > > wait for another checkpoint to > > commits all the pending transactions and emit the list of files to the > > GlobalCommitter. The GlobalCommitter > > would wait for another checkpoint to also write the metadata with 2pc > > (Although sometimes 2pc is not needed > > for writing metadata, it should be only an optimization and still > requires > > the Committer do commit before > > notifying the global Committer. Also another note is GlobalCommitter is > > also added for some other cases > > like some sinks want an commiter with dop = 1, like IceBergSink). > > > > However, a more general issue to me is that currently we do not limit > > users to not emit records in > > notifyCheckpointComplete in the API level. The sink case could be viewed > > as a special case, but in addition > > to this one, logically users could also implement their own cases that > > emits records in notifyCheckpointComplete. > > > > Best, > > Yun > > > > ------------------Original Mail ------------------ > > Sender:Arvid Heise <ar...@apache.org> > > Send Date:Fri Feb 12 20:46:04 2021 > > Recipients:dev <dev@flink.apache.org> > > CC:Yun Gao <yungao...@aliyun.com> > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > > Hi Piotr, > > > > > > > > Thank you for raising your concern. Unfortunately, I do not have a better > > > > idea than doing closing of operators intermittently with checkpoints (= > > > > multiple last checkpoints). > > > > > > > > However, two ideas on how to improve the overall user experience: > > > > 1. If an operator is not relying on notifyCheckpointComplete, we can > close > > > > it faster (without waiting for a checkpoint). In general, I'd assume that > > > > almost all non-sinks behave that way. > > > > 2. We may increase the checkpointing frequency for the last checkpoints. > We > > > > need to avoid overloading checkpoint storages and task managers, but I > > > > assume the more operators are closed, the lower the checkpointing > interval > > > > can be. > > > > > > > > For 1, I'd propose to add (name TBD): > > > > > > > > default boolean StreamOperator#requiresFinalCheckpoint() { > > > > return true; > > > > } > > > > > > > > This means all operators are conservatively (=slowly) closed. For most > > > > operators, we can then define their behavior by overriding in > > > > AbstractUdfStreamOperator > > > > > > > > @Override > > > > boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() { > > > > return userFunction instanceof CheckpointListener; > > > > } > > > > > > > > This idea can be further refined in also adding requiresFinalCheckpoint > to > > > > CheckpointListener to exclude all operators with UDFs that implement > > > > CheckpointListener but do not need it for 2pc. > > > > > > > > @Override > > > > boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() { > > > > return userFunction instanceof CheckpointListener && > > > > ((CheckpointListener) userFunction).requiresFinalCheckpoint(); > > > > } > > > > > > > > That approach would also work for statebackends/snapshot strategies that > > > > require some 2pc. > > > > > > > > If we can contain it to the @PublicEvolving StreamOperator, it would be > > > > better of course. > > > > > > > > Best, > > > > > > > > Arvid > > > > > > > > On Fri, Feb 12, 2021 at 11:36 AM Piotr Nowojski > > > > wrote: > > > > > > > > > Hey, > > > > > > > > > > I would like to raise a concern about implementation of the final > > > > > checkpoints taking into account operators/functions that are > implementing > > > > > two phase commit (2pc) protocol for exactly-once processing with some > > > > > external state (kept outside of the Flink). Primarily exactly-once > sinks. > > > > > > > > > > First of all, as I understand it, this is not planned in the first > > version > > > > > of this FLIP. I'm fine with that, however I would strongly emphasize > this > > > > > in every place we will be mentioning FLIP-147 efforts. This is because > > me, > > > > > as a user, upon hearing "Flink supports checkpointing with bounded > > inputs" > > > > > I would expect 2pc to work properly and to commit the external side > > effects > > > > > upon finishing. As it is now, I (as a user) would be surprised with a > > > > > silent data loss (of not committed trailing data). This is just a > remark, > > > > > that we need to attach this warning to every blog > post/documentation/user > > > > > mailing list response related to "Support Checkpoints After Tasks > > > > > Finished". Also I would suggest to prioritize the follow up of > supporting > > > > > 2pc. > > > > > > > > > > Secondly, I think we are missing how difficult and problematic will be > > 2pc > > > > > support with the final checkpoint. > > > > > > > > > > For starters, keep in mind that currently 2pc can be implemented by > users > > > > > using both `@Public` APIs as functions and `@PublicEvolving` operators > in > > > > > any place in the job graph. It's not limited to only the sinks. For > > > > > example users could easily implement the `AsynFunction` (for > > > > > `AsyncWaitOperator`) that is using 2pc based on the > `CheckpointListener` > > > > > interface. I'm not saying it's common, probably just a tiny minority of > > > > > users are doing that (if any at all), but nevertheless that's possible > > and > > > > > currently (implicitly?) supported in Flink. > > > > > > > > > > Next complication is the support of bounded streams (`BoundedOneInput` > or > > > > > `BoundedMultiInput` interfaces) and the closing/shutdown procedure of > the > > > > > operators. Currently it works as follows: > > > > > 0. Task receives EndOfPartitionEvent (or source finishes) > > > > > 1. `endOfInput` is called on the first operator in the chain > > > > > 2. We quiesce the processing timers > > > > > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the > > first > > > > > operator, so no new timers will be triggered > > > > > 3. We wait for the already fired timers to finish executing (spinning > > > > > mailbox loop) > > > > > 4. We are closing the first operator > > > > > 5. We go to the next (second) operator in the chain and repeat the > steps > > 1. > > > > > to 5. > > > > > > > > > > This is because operators can emit data after processing `endOfInput`, > > from > > > > > timers, async mailbox actions and inside the `close` method itself. > > > > > > > > > > Now the problem is to support the final checkpoint with 2pc, we need > > > > > trigger `snapshotState` and `notifyCheckpointComplete` call at the very > > > > > least only after `endOfInput` call on the operator. Probably the best > > place > > > > > would be in between steps 3. and 4. However that means, we would be > > forced > > > > > to wait for steps 1. to 3. to finish, then wait for a next checkpoint > to > > > > > trigger AND complete, before finally closing the head operator, and > only > > > > > then we can start closing the next operator in the chain: > > > > > > > > > > 0. Task receives EndOfPartitionEvent (or source finishes) > > > > > 1. `endOfInput` is called on the first operator in the chain > > > > > 2. We quiesce the processing timers > > > > > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the > > first > > > > > operator, so no new timers will be triggered > > > > > 3. We wait for the already fired timers to finish executing (spinning > > > > > mailbox loop) > > > > > *3b. We wait for one more checkpoint to trigger and for the > > > > > `notifyCheckpointComplete` RPC.* > > > > > 4. We are closing the first operator > > > > > 5. We go to the next (second) operator in the chain and repeat the > steps > > 1. > > > > > to 5. > > > > > > > > > > That means, we can close one operator per successful checkpoint. To > close > > > > > 10 operators, we would need 10 successful checkpoints. > > > > > > > > > > I was thinking about different approaches to this problem, and I > couldn't > > > > > find any viable ones. All I could think of would break the current > > > > > `@Public` API and/or would be ugly/confusing for the users. > > > > > > > > > > For example a relatively simple solution, to introduce a `preClose` or > > > > > `flush` method to the operators, with a contract that after > > > > > `flush`, operators would be forbidden from emitting more records, so > that > > > > > we can replace step 4. with this `flush` call, and then having a single > > > > > checkpoint to finish 2pc for all of the operators inside the chain, > > doesn't > > > > > work. Sheer fact of adding this `flush` method and changing the > contract > > > > > would break the current API and Yun Gao has pointed out to me, that we > > > > > either already support, or want to support operators that are emitting > > > > > records from within the `notifyCheckpointComplete` call: > > > > > > > > > > > Yun Gao: > > > > > > like with the new sink api there might be writer -> committer -> > global > > > > > committer, the committer would need to wait for the last checkpoint to > > > > > commit > > > > > > the last piece of data, and after that it also need to emit the list > of > > > > > transactions get committed to global committer to do some finalization > > > > > logic. > > > > > > > > > > So it wouldn't solve the problem (at least not fully). > > > > > > > > > > I don't know if anyone has any better ideas how to solve this problem? > > > > > > > > > > Piotrek > > > > > > > > > > pt., 15 sty 2021 o 14:57 Yun Gao > > > > > napisaĆ(a): > > > > > > > > > > > Hi Aljoscha, > > > > > > > > > > > > I think so since we seems to do not have other divergence and new > > > > > > objections now. I'll open the vote then. Very thanks! > > > > > > > > > > > > Best, > > > > > > Yun > > > > > > > > > > > > > > > > > > ------------------------------------------------------------------ > > > > > > From:Aljoscha Krettek > > > > > > Send Time:2021 Jan. 15 (Fri.) 21:24 > > > > > > To:dev > > > > > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks > > Finished > > > > > > > > > > > > Thanks for the summary! I think we can now move towards a [VOTE] > > thread, > > > > > > right? > > > > > > > > > > > > On 2021/01/15 13:43, Yun Gao wrote: > > > > > > >1) For the problem that the "new" root task coincidently finished > > > > > > >before getting triggered successfully, we have listed two options in > > > > > > >the FLIP-147[1], for the first version, now we are not tend to go > with > > > > > > >the first option that JM would re-compute and re-trigger new sources > > > > > > >when it realized some tasks are not triggered successfully. This > > option > > > > > > >would avoid the complexity of adding new PRC and duplicating task > > > > > > >states, and in average case it would not cause too much overhead. > > > > > > > > > > > > You wrote "we are *not* tend to go with the first option", but I > think > > > > > > you meant wo write "we tend to *now* go with the first option", > right? > > > > > > That's also how it is in the FLIP, I just wanted to clarify for the > > > > > > mailing list. > > > > > > > > > > > > > > > > > > > > > >