Hi Till, Guowei,

Very thanks for initiating the disucssion and the deep thoughts! 

For the notifyCheckpointComplete, I also agree we could try to avoid
emitting new records in notifyCheckpointComplete via using OperatorCoordinator
for new sink API. Besides, the hive sink might also need some modification
for it also emits records in notifyCheckpointComplete.

For unifying the process of stopping with savepoint and finished due to all 
records
are processed, I also agree with that unifying would always be better if we 
could achieve, 
but I'm still not fully catch up with the implementation: Based on the 
discussion in FLINK-21133, 
my understanding is that for stopping with savepoint, now we want to first stop 
the source, then we 
trigger a savepoint, and after the source received notifyCheckpointComplete, 
the source would 
start emitting EndOfPartitionEvent to finish the job, am I correct ?

For normal finish, a difference to me might be if we have multiple sources, we 
could not guarantee
when the sources are to finish. We might have one source run one 1 minute and 
another one run for
1 hour. To unify with the process with stop with savepoint, we might need to 
hold the fast source until
all the sources are finished? An coordinator would be introduced to count the 
number of sources
runing and trigger the final savepoint / checkpoint. For the extreme cases, if 
we have both bounded and 
unbounded sources, we might only count how much bounded source are remaining ? 
And if all the bounded
sources are finished we would trigger the special checkpoint. After all the 
bounded part of the graph are 
finished, the the remaining part could still do checkpoint and commit data with 
FLIP-147.

Best,
Yun




 ------------------Original Mail ------------------
Sender:Guowei Ma <guowei....@gmail.com>
Send Date:Wed Feb 24 17:35:36 2021
Recipients:dev <dev@flink.apache.org>
CC:Arvid Heise <ar...@apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi, Till

Thank you very much for your careful consideration



*1. Emit records in `NotifyCheckpointComplete`.*

 Sorry for making you misunderstanding because of my expression. I just

want to say the current interface does not prevent users from doing it.

From the perspective of the new sink api, we might not depend on emitting

records in `NotifyCheckpointComplete`, like using `OperatorCoordinator`

instead.





*2. What does the FLIP-147 guarantee?*I think initially this FLIP want to

achieve two targets:

1. Tasks/Operators exit correctly (as you mentioned the lifecycle of a

Task/StreamTask/StreamOperator.).

2. Continue to trigger checkpoint after some tasks for mixed jobs.



I think the first thing is related to the discussion in FLINK-21133. If I

understand correctly, in addition to supporting the tasks / operators to

exit correctly, now we also want to unify the process of the tasks and

operators for savepoint / finish.

I think the second thing is orthogonal to the FLINK-21133 because there are

topologies that have both the bounded and unbounded input.



*3. How to unify the operator exit process of FLIP-147 with

stop-with-savepoint?*

I am not very sure about how to do it yet. But if I understand the

discussion in the jira correctly it needs to introduce some logic into

`CheckpointCoordinator`, which responses for triggering “the unified

operator exit process”. Am I correct?



Best,

Guowei





On Tue, Feb 23, 2021 at 5:10 PM Till Rohrmann  wrote:



> Thanks for the explanation Yun and Guowei. I have to admit that I do not

> fully understand why this is strictly required but I think that we are

> touching two very important aspects which might have far fetching

> consequences for how Flink works:

>

> 1) Do we want to allow that multiple checkpoints are required to

> materialize results?

> 2) Do we want to allow to emit records in notifyCheckpointComplete?

>

> For 1) I am not sure whether this has been discussed within the community

> sufficiently. Requiring multiple checkpoints to materialize a result

> because of multi level committers has the consequence that we increase the

> latency from checkpoint interval to #levels * checkpoint interval.

> Moreover, having to drain the pipeline in multiple steps, would break the

> stop-with-savepoint --drain because which savepoint do you report to the

> user?

>

> For 2) allowing to send records after the final notifyCheckpointComplete

> will effectively mean that we need to shut down a topology in multiple

> steps (in the worst case one operator per checkpoint). This would be a

> strong argument for not allowing this to me. The fact that users can send

> records after the notifyCheckpointComplete is more by accident than by

> design. I think we should make this a very deliberate decision and in doubt

> I would be in favour of a more restrictive model unless there is a very

> good reason why this should be supported.

>

> Taking also the discussion in FLINK-21133 [1] into account, it seems to me

> that we haven't really understood what kind of guarantees we want to give

> to our users and how the final checkpoint should exactly work. I understand

> that this is not included in the first scope of FLIP-147 but I think this

> is so important that we should figure this out asap. Also because the exact

> shut down behaviour will have to be aligned with the lifecycle of a

> Task/StreamTask/StreamOperator. And last but not least because other

> features such as the new sink API start building upon a shut down model

> which has not been fully understood/agreed upon.

>

> [1] https://issues.apache.org/jira/browse/FLINK-21133

>

> Cheers,

> Till

>

> On Tue, Feb 16, 2021 at 9:45 AM Guowei Ma  wrote:

>

> > Thanks Yun for the detailed explanation.

> > A simple supplementary explanation about the sink case: Maybe we could

> use

> > `OperatorCoordinator` to avoid sending the element to the downstream

> > operator.

> > But I agree we could not limit the users not to emit records in the

> > `notiyCheckpointComplete`.

> >

> > Best,

> > Guowei

> >

> >

> > On Tue, Feb 16, 2021 at 2:06 PM Yun Gao 

> > wrote:

> >

> > > Hi all,

> > >

> > > I'd like to first detail the issue with emitting records in

> > > notifyCheckpointComplete for context. For specific usage,

> > > an example would be for sink, it might want to write some metadata

> after

> > > all the transactions are committed

> > > (like write a marker file _SUCCESS to the output directory). This case

> is

> > > currently supported via the two level

> > > committers of the new sink API: when received endOfInput(), the

> Committer

> > > wait for another checkpoint to

> > > commits all the pending transactions and emit the list of files to the

> > > GlobalCommitter. The GlobalCommitter

> > > would wait for another checkpoint to also write the metadata with 2pc

> > > (Although sometimes 2pc is not needed

> > > for writing metadata, it should be only an optimization and still

> > requires

> > > the Committer do commit before

> > > notifying the global Committer. Also another note is GlobalCommitter is

> > > also added for some other cases

> > > like some sinks want an commiter with dop = 1, like IceBergSink).

> > >

> > > However, a more general issue to me is that currently we do not limit

> > > users to not emit records in

> > > notifyCheckpointComplete in the API level. The sink case could be

> viewed

> > > as a special case, but in addition

> > > to this one, logically users could also implement their own cases that

> > > emits records in notifyCheckpointComplete.

> > >

> > > Best,

> > > Yun

> > >

> > > ------------------Original Mail ------------------

> > > Sender:Arvid Heise 

> > > Send Date:Fri Feb 12 20:46:04 2021

> > > Recipients:dev 

> > > CC:Yun Gao 

> > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks

> Finished

> > > Hi Piotr,

> > >

> > >

> > >

> > > Thank you for raising your concern. Unfortunately, I do not have a

> better

> > >

> > > idea than doing closing of operators intermittently with checkpoints (=

> > >

> > > multiple last checkpoints).

> > >

> > >

> > >

> > > However, two ideas on how to improve the overall user experience:

> > >

> > > 1. If an operator is not relying on notifyCheckpointComplete, we can

> > close

> > >

> > > it faster (without waiting for a checkpoint). In general, I'd assume

> that

> > >

> > > almost all non-sinks behave that way.

> > >

> > > 2. We may increase the checkpointing frequency for the last

> checkpoints.

> > We

> > >

> > > need to avoid overloading checkpoint storages and task managers, but I

> > >

> > > assume the more operators are closed, the lower the checkpointing

> > interval

> > >

> > > can be.

> > >

> > >

> > >

> > > For 1, I'd propose to add (name TBD):

> > >

> > >

> > >

> > > default boolean StreamOperator#requiresFinalCheckpoint() {

> > >

> > > return true;

> > >

> > > }

> > >

> > >

> > >

> > > This means all operators are conservatively (=slowly) closed. For most

> > >

> > > operators, we can then define their behavior by overriding in

> > >

> > > AbstractUdfStreamOperator

> > >

> > >

> > >

> > > @Override

> > >

> > > boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {

> > >

> > > return userFunction instanceof CheckpointListener;

> > >

> > > }

> > >

> > >

> > >

> > > This idea can be further refined in also adding requiresFinalCheckpoint

> > to

> > >

> > > CheckpointListener to exclude all operators with UDFs that implement

> > >

> > > CheckpointListener but do not need it for 2pc.

> > >

> > >

> > >

> > > @Override

> > >

> > > boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {

> > >

> > > return userFunction instanceof CheckpointListener &&

> > >

> > > ((CheckpointListener) userFunction).requiresFinalCheckpoint();

> > >

> > > }

> > >

> > >

> > >

> > > That approach would also work for statebackends/snapshot strategies

> that

> > >

> > > require some 2pc.

> > >

> > >

> > >

> > > If we can contain it to the @PublicEvolving StreamOperator, it would be

> > >

> > > better of course.

> > >

> > >

> > >

> > > Best,

> > >

> > >

> > >

> > > Arvid

> > >

> > >

> > >

> > > On Fri, Feb 12, 2021 at 11:36 AM Piotr Nowojski

> > >

> > > wrote:

> > >

> > >

> > >

> > > > Hey,

> > >

> > > >

> > >

> > > > I would like to raise a concern about implementation of the final

> > >

> > > > checkpoints taking into account operators/functions that are

> > implementing

> > >

> > > > two phase commit (2pc) protocol for exactly-once processing with some

> > >

> > > > external state (kept outside of the Flink). Primarily exactly-once

> > sinks.

> > >

> > > >

> > >

> > > > First of all, as I understand it, this is not planned in the first

> > > version

> > >

> > > > of this FLIP. I'm fine with that, however I would strongly emphasize

> > this

> > >

> > > > in every place we will be mentioning FLIP-147 efforts. This is

> because

> > > me,

> > >

> > > > as a user, upon hearing "Flink supports checkpointing with bounded

> > > inputs"

> > >

> > > > I would expect 2pc to work properly and to commit the external side

> > > effects

> > >

> > > > upon finishing. As it is now, I (as a user) would be surprised with a

> > >

> > > > silent data loss (of not committed trailing data). This is just a

> > remark,

> > >

> > > > that we need to attach this warning to every blog

> > post/documentation/user

> > >

> > > > mailing list response related to "Support Checkpoints After Tasks

> > >

> > > > Finished". Also I would suggest to prioritize the follow up of

> > supporting

> > >

> > > > 2pc.

> > >

> > > >

> > >

> > > > Secondly, I think we are missing how difficult and problematic will

> be

> > > 2pc

> > >

> > > > support with the final checkpoint.

> > >

> > > >

> > >

> > > > For starters, keep in mind that currently 2pc can be implemented by

> > users

> > >

> > > > using both `@Public` APIs as functions and `@PublicEvolving`

> operators

> > in

> > >

> > > > any place in the job graph. It's not limited to only the sinks. For

> > >

> > > > example users could easily implement the `AsynFunction` (for

> > >

> > > > `AsyncWaitOperator`) that is using 2pc based on the

> > `CheckpointListener`

> > >

> > > > interface. I'm not saying it's common, probably just a tiny minority

> of

> > >

> > > > users are doing that (if any at all), but nevertheless that's

> possible

> > > and

> > >

> > > > currently (implicitly?) supported in Flink.

> > >

> > > >

> > >

> > > > Next complication is the support of bounded streams

> (`BoundedOneInput`

> > or

> > >

> > > > `BoundedMultiInput` interfaces) and the closing/shutdown procedure of

> > the

> > >

> > > > operators. Currently it works as follows:

> > >

> > > > 0. Task receives EndOfPartitionEvent (or source finishes)

> > >

> > > > 1. `endOfInput` is called on the first operator in the chain

> > >

> > > > 2. We quiesce the processing timers

> > >

> > > > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the

> > > first

> > >

> > > > operator, so no new timers will be triggered

> > >

> > > > 3. We wait for the already fired timers to finish executing (spinning

> > >

> > > > mailbox loop)

> > >

> > > > 4. We are closing the first operator

> > >

> > > > 5. We go to the next (second) operator in the chain and repeat the

> > steps

> > > 1.

> > >

> > > > to 5.

> > >

> > > >

> > >

> > > > This is because operators can emit data after processing

> `endOfInput`,

> > > from

> > >

> > > > timers, async mailbox actions and inside the `close` method itself.

> > >

> > > >

> > >

> > > > Now the problem is to support the final checkpoint with 2pc, we need

> > >

> > > > trigger `snapshotState` and `notifyCheckpointComplete` call at the

> very

> > >

> > > > least only after `endOfInput` call on the operator. Probably the best

> > > place

> > >

> > > > would be in between steps 3. and 4. However that means, we would be

> > > forced

> > >

> > > > to wait for steps 1. to 3. to finish, then wait for a next checkpoint

> > to

> > >

> > > > trigger AND complete, before finally closing the head operator, and

> > only

> > >

> > > > then we can start closing the next operator in the chain:

> > >

> > > >

> > >

> > > > 0. Task receives EndOfPartitionEvent (or source finishes)

> > >

> > > > 1. `endOfInput` is called on the first operator in the chain

> > >

> > > > 2. We quiesce the processing timers

> > >

> > > > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the

> > > first

> > >

> > > > operator, so no new timers will be triggered

> > >

> > > > 3. We wait for the already fired timers to finish executing (spinning

> > >

> > > > mailbox loop)

> > >

> > > > *3b. We wait for one more checkpoint to trigger and for the

> > >

> > > > `notifyCheckpointComplete` RPC.*

> > >

> > > > 4. We are closing the first operator

> > >

> > > > 5. We go to the next (second) operator in the chain and repeat the

> > steps

> > > 1.

> > >

> > > > to 5.

> > >

> > > >

> > >

> > > > That means, we can close one operator per successful checkpoint. To

> > close

> > >

> > > > 10 operators, we would need 10 successful checkpoints.

> > >

> > > >

> > >

> > > > I was thinking about different approaches to this problem, and I

> > couldn't

> > >

> > > > find any viable ones. All I could think of would break the current

> > >

> > > > `@Public` API and/or would be ugly/confusing for the users.

> > >

> > > >

> > >

> > > > For example a relatively simple solution, to introduce a `preClose`

> or

> > >

> > > > `flush` method to the operators, with a contract that after

> > >

> > > > `flush`, operators would be forbidden from emitting more records, so

> > that

> > >

> > > > we can replace step 4. with this `flush` call, and then having a

> single

> > >

> > > > checkpoint to finish 2pc for all of the operators inside the chain,

> > > doesn't

> > >

> > > > work. Sheer fact of adding this `flush` method and changing the

> > contract

> > >

> > > > would break the current API and Yun Gao has pointed out to me, that

> we

> > >

> > > > either already support, or want to support operators that are

> emitting

> > >

> > > > records from within the `notifyCheckpointComplete` call:

> > >

> > > >

> > >

> > > > > Yun Gao:

> > >

> > > > > like with the new sink api there might be writer -> committer ->

> > global

> > >

> > > > committer, the committer would need to wait for the last checkpoint

> to

> > >

> > > > commit

> > >

> > > > > the last piece of data, and after that it also need to emit the

> list

> > of

> > >

> > > > transactions get committed to global committer to do some

> finalization

> > >

> > > > logic.

> > >

> > > >

> > >

> > > > So it wouldn't solve the problem (at least not fully).

> > >

> > > >

> > >

> > > > I don't know if anyone has any better ideas how to solve this

> problem?

> > >

> > > >

> > >

> > > > Piotrek

> > >

> > > >

> > >

> > > > pt., 15 sty 2021 o 14:57 Yun Gao

> > >

> > > > napisał(a):

> > >

> > > >

> > >

> > > > > Hi Aljoscha,

> > >

> > > > >

> > >

> > > > > I think so since we seems to do not have other divergence and new

> > >

> > > > > objections now. I'll open the vote then. Very thanks!

> > >

> > > > >

> > >

> > > > > Best,

> > >

> > > > > Yun

> > >

> > > > >

> > >

> > > > >

> > >

> > > > > ------------------------------------------------------------------

> > >

> > > > > From:Aljoscha Krettek

> > >

> > > > > Send Time:2021 Jan. 15 (Fri.) 21:24

> > >

> > > > > To:dev

> > >

> > > > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks

> > > Finished

> > >

> > > > >

> > >

> > > > > Thanks for the summary! I think we can now move towards a [VOTE]

> > > thread,

> > >

> > > > > right?

> > >

> > > > >

> > >

> > > > > On 2021/01/15 13:43, Yun Gao wrote:

> > >

> > > > > >1) For the problem that the "new" root task coincidently finished

> > >

> > > > > >before getting triggered successfully, we have listed two options

> in

> > >

> > > > > >the FLIP-147[1], for the first version, now we are not tend to go

> > with

> > >

> > > > > >the first option that JM would re-compute and re-trigger new

> sources

> > >

> > > > > >when it realized some tasks are not triggered successfully. This

> > > option

> > >

> > > > > >would avoid the complexity of adding new PRC and duplicating task

> > >

> > > > > >states, and in average case it would not cause too much overhead.

> > >

> > > > >

> > >

> > > > > You wrote "we are *not* tend to go with the first option", but I

> > think

> > >

> > > > > you meant wo write "we tend to *now* go with the first option",

> > right?

> > >

> > > > > That's also how it is in the FLIP, I just wanted to clarify for the

> > >

> > > > > mailing list.

> > >

> > > > >

> > >

> > > > >

> > >

> > > >

> > >

> > >

> >

>

Reply via email to