Hey, I would like to raise a concern about implementation of the final checkpoints taking into account operators/functions that are implementing two phase commit (2pc) protocol for exactly-once processing with some external state (kept outside of the Flink). Primarily exactly-once sinks.
First of all, as I understand it, this is not planned in the first version of this FLIP. I'm fine with that, however I would strongly emphasize this in every place we will be mentioning FLIP-147 efforts. This is because me, as a user, upon hearing "Flink supports checkpointing with bounded inputs" I would expect 2pc to work properly and to commit the external side effects upon finishing. As it is now, I (as a user) would be surprised with a silent data loss (of not committed trailing data). This is just a remark, that we need to attach this warning to every blog post/documentation/user mailing list response related to "Support Checkpoints After Tasks Finished". Also I would suggest to prioritize the follow up of supporting 2pc. Secondly, I think we are missing how difficult and problematic will be 2pc support with the final checkpoint. For starters, keep in mind that currently 2pc can be implemented by users using both `@Public` APIs as functions and `@PublicEvolving` operators in any place in the job graph. It's not limited to only the sinks. For example users could easily implement the `AsynFunction` (for `AsyncWaitOperator`) that is using 2pc based on the `CheckpointListener` interface. I'm not saying it's common, probably just a tiny minority of users are doing that (if any at all), but nevertheless that's possible and currently (implicitly?) supported in Flink. Next complication is the support of bounded streams (`BoundedOneInput` or `BoundedMultiInput` interfaces) and the closing/shutdown procedure of the operators. Currently it works as follows: 0. Task receives EndOfPartitionEvent (or source finishes) 1. `endOfInput` is called on the first operator in the chain 2. We quiesce the processing timers (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first operator, so no new timers will be triggered 3. We wait for the already fired timers to finish executing (spinning mailbox loop) 4. We are closing the first operator 5. We go to the next (second) operator in the chain and repeat the steps 1. to 5. This is because operators can emit data after processing `endOfInput`, from timers, async mailbox actions and inside the `close` method itself. Now the problem is to support the final checkpoint with 2pc, we need trigger `snapshotState` and `notifyCheckpointComplete` call at the very least only after `endOfInput` call on the operator. Probably the best place would be in between steps 3. and 4. However that means, we would be forced to wait for steps 1. to 3. to finish, then wait for a next checkpoint to trigger AND complete, before finally closing the head operator, and only then we can start closing the next operator in the chain: 0. Task receives EndOfPartitionEvent (or source finishes) 1. `endOfInput` is called on the first operator in the chain 2. We quiesce the processing timers (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first operator, so no new timers will be triggered 3. We wait for the already fired timers to finish executing (spinning mailbox loop) *3b. We wait for one more checkpoint to trigger and for the `notifyCheckpointComplete` RPC.* 4. We are closing the first operator 5. We go to the next (second) operator in the chain and repeat the steps 1. to 5. That means, we can close one operator per successful checkpoint. To close 10 operators, we would need 10 successful checkpoints. I was thinking about different approaches to this problem, and I couldn't find any viable ones. All I could think of would break the current `@Public` API and/or would be ugly/confusing for the users. For example a relatively simple solution, to introduce a `preClose` or `flush` method to the operators, with a contract that after `flush`, operators would be forbidden from emitting more records, so that we can replace step 4. with this `flush` call, and then having a single checkpoint to finish 2pc for all of the operators inside the chain, doesn't work. Sheer fact of adding this `flush` method and changing the contract would break the current API and Yun Gao has pointed out to me, that we either already support, or want to support operators that are emitting records from within the `notifyCheckpointComplete` call: > Yun Gao: > like with the new sink api there might be writer -> committer -> global committer, the committer would need to wait for the last checkpoint to commit > the last piece of data, and after that it also need to emit the list of transactions get committed to global committer to do some finalization logic. So it wouldn't solve the problem (at least not fully). I don't know if anyone has any better ideas how to solve this problem? Piotrek pt., 15 sty 2021 o 14:57 Yun Gao <yungao...@aliyun.com.invalid> napisaĆ(a): > Hi Aljoscha, > > I think so since we seems to do not have other divergence and new > objections now. I'll open the vote then. Very thanks! > > Best, > Yun > > > ------------------------------------------------------------------ > From:Aljoscha Krettek <aljos...@apache.org> > Send Time:2021 Jan. 15 (Fri.) 21:24 > To:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > > Thanks for the summary! I think we can now move towards a [VOTE] thread, > right? > > On 2021/01/15 13:43, Yun Gao wrote: > >1) For the problem that the "new" root task coincidently finished > >before getting triggered successfully, we have listed two options in > >the FLIP-147[1], for the first version, now we are not tend to go with > >the first option that JM would re-compute and re-trigger new sources > >when it realized some tasks are not triggered successfully. This option > >would avoid the complexity of adding new PRC and duplicating task > >states, and in average case it would not cause too much overhead. > > You wrote "we are *not* tend to go with the first option", but I think > you meant wo write "we tend to *now* go with the first option", right? > That's also how it is in the FLIP, I just wanted to clarify for the > mailing list. > >