Hey,

I would like to raise a concern about implementation of the final
checkpoints taking into account operators/functions that are implementing
two phase commit (2pc) protocol for exactly-once processing with some
external state (kept outside of the Flink). Primarily exactly-once sinks.

First of all, as I understand it, this is not planned in the first version
of this FLIP. I'm fine with that, however I would strongly emphasize this
in every place we will be mentioning FLIP-147 efforts. This is because me,
as a user, upon hearing "Flink supports checkpointing with bounded inputs"
I would expect 2pc to work properly and to commit the external side effects
upon finishing. As it is now, I (as a user) would be surprised with a
silent data loss (of not committed trailing data). This is just a remark,
that we need to attach this warning to every blog post/documentation/user
mailing list response related to "Support Checkpoints After Tasks
Finished". Also I would suggest to prioritize the follow up of supporting
2pc.

Secondly, I think we are missing how difficult and problematic will be 2pc
support with the final checkpoint.

For starters, keep in mind that currently 2pc can be implemented by users
using both `@Public` APIs as functions and `@PublicEvolving` operators in
any place in the job graph. It's not limited to only the sinks. For
example users could easily implement the `AsynFunction` (for
`AsyncWaitOperator`) that is using 2pc based on the `CheckpointListener`
interface. I'm not saying it's common, probably just a tiny minority of
users are doing that (if any at all), but nevertheless that's possible and
currently (implicitly?) supported in Flink.

Next complication is the support of bounded streams (`BoundedOneInput` or
`BoundedMultiInput` interfaces) and the closing/shutdown procedure of the
operators. Currently it works as follows:
0. Task receives EndOfPartitionEvent (or source finishes)
1. `endOfInput` is called on the first operator in the chain
2. We quiesce the processing timers
(`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first
operator, so no new timers will be triggered
3. We wait for the already fired timers to finish executing (spinning
mailbox loop)
4. We are closing the first operator
5. We go to the next (second) operator in the chain and repeat the steps 1.
to 5.

This is because operators can emit data after processing `endOfInput`, from
timers, async mailbox actions and inside the `close` method itself.

Now the problem is to support the final checkpoint with 2pc, we need
trigger `snapshotState` and `notifyCheckpointComplete` call at the very
least only after `endOfInput` call on the operator. Probably the best place
would be in between steps 3. and 4. However that means, we would be forced
to wait for steps 1. to 3. to finish, then wait for a next checkpoint to
trigger AND complete, before finally closing the head operator, and only
then we can start closing the next operator in the chain:

0. Task receives EndOfPartitionEvent (or source finishes)
1. `endOfInput` is called on the first operator in the chain
2. We quiesce the processing timers
(`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the first
operator, so no new timers will be triggered
3. We wait for the already fired timers to finish executing (spinning
mailbox loop)
*3b. We wait for one more checkpoint to trigger and for the
`notifyCheckpointComplete` RPC.*
4. We are closing the first operator
5. We go to the next (second) operator in the chain and repeat the steps 1.
to 5.

That means, we can close one operator per successful checkpoint. To close
10 operators, we would need 10 successful checkpoints.

I was thinking about different approaches to this problem, and I couldn't
find any viable ones. All I could think of would break the current
`@Public` API and/or would be ugly/confusing for the users.

For example a relatively simple solution, to introduce a `preClose` or
`flush` method to the operators, with a contract that after
`flush`, operators would be forbidden from emitting more records, so that
we can replace step 4. with this `flush` call, and then having a single
checkpoint to finish 2pc for all of the operators inside the chain, doesn't
work. Sheer fact of adding this `flush` method and changing the contract
would break the current API and Yun Gao has pointed out to me, that we
either already support, or want to support operators that are emitting
records from within the `notifyCheckpointComplete` call:

> Yun Gao:
> like with the new sink api there might be writer -> committer -> global
committer, the committer would need to wait for the last checkpoint to
commit
> the last piece of data, and after that it also need to emit the list of
transactions get committed to global committer to do some finalization
logic.

So it wouldn't solve the problem (at least not fully).

I don't know if anyone has any better ideas how to solve this problem?

Piotrek

pt., 15 sty 2021 o 14:57 Yun Gao <yungao...@aliyun.com.invalid> napisaƂ(a):

> Hi Aljoscha,
>
>     I think so since we seems to do not have other divergence and new
> objections now. I'll open the vote then. Very thanks!
>
> Best,
>  Yun
>
>
> ------------------------------------------------------------------
> From:Aljoscha Krettek <aljos...@apache.org>
> Send Time:2021 Jan. 15 (Fri.) 21:24
> To:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
> Thanks for the summary! I think we can now move towards a [VOTE] thread,
> right?
>
> On 2021/01/15 13:43, Yun Gao wrote:
> >1) For the problem that the "new" root task coincidently finished
> >before getting triggered successfully, we have listed two options in
> >the FLIP-147[1], for the first version, now we are not tend to go with
> >the first option that JM would re-compute and re-trigger new sources
> >when it realized some tasks are not triggered successfully. This option
> >would avoid the complexity of adding new PRC and duplicating task
> >states, and in average case it would not cause too much overhead.
>
> You wrote "we are *not* tend to go with the first option", but I think
> you meant wo write "we tend to *now* go with the first option", right?
> That's also how it is in the FLIP, I just wanted to clarify for the
> mailing list.
>
>

Reply via email to