+1 to what Arvid said.
I am also thinking we could even consider dropping the dispose method
straightaway to make the need for migration obvious. I'd make that
decision during the implementation/on the PR though, once we verify if
the deprecation option works.
Best,
Dawid
On 10/06/2021 09:37, A
The whole operator API is only for advanced users and is not marked
Public(Evolving). Users have to accept that things change and we have to
use that freedom that we don't have in many other parts of the system.
The change needs to be very clear in the change notes though. I also don't
expect many
Hi all,
Very thanks for the warm discussions!
Regarding the change in the operator lifecycle, I also agree with adding
the flush/drain/stopAndFlush/finish method. For the duplication between this
method and `endInput` for one input operator, with some offline disucssion with
Dawid now I also thi
Hi Piot,
I'm fine with just doing it on the Sink. My responses were focused on the
API (the how) not on the concept (the if). Just keep the methods on the
different places in sync, such that it is easy to introduce a common
interface later.
Re name: drain is not a reinvention as it's used quite o
Hi,
Arvid: What's the problem with providing `void flush()`/`void drain()` only
in the `SinkFunction`? It would avoid the problem of typing. Why would one
need to have it in the other `Rich***Function`s? For `flush()` to make
sense, the entity which has this method, would need to buffer some data.
Hi Dawid,
I see your point. I'd probably add drain only to Rich*Function where we
have the type bounds. Then we still need your Flushable interface in
Rich*Function<..., T> to call it efficiently but we at least avoid weird
type combinations. I'll have a rethink later.
The proper solution is prob
Hey,
@Arvid The problem with adding the "drain/flush/stopProcessing" method
to RichFunction is that it is not typed with the output type. At the
same time we would most likely need a way to emit records from the
method. That's originally thought about adding a typed interface which
honestly I don'
I have not followed the complete discussion and can't comment on the
concepts. However, I have some ideas on the API changes:
1. If it's about adding additional life-cycle methods to UDFs, we should
add the flush/endOfInput to RichFunction as this is the current way to
define it. At this point, I
Thanks for the lively discussion everyone. I have to admit that I am not
really convinced that we should call the interface Flushable and the method
flush. The problem is that this method should in the first place tell the
operator that it should stop processing and flush all buffered data. The
met
Hi,
Thanks for resuming this discussion. I think +1 for the proposal of
dropping (deprecating) `dispose()`, and adding `flush()` to the
`StreamOperator`/udfs. Semantically it would be more like new `close()` is
an equivalent of old `dispose()`. Old `close()` is an equivalent of new
`flush() + clos
Hi all,
Very thanks @Dawid for resuming the discussion and very thanks @Till for the
summary ! (and very sorry for I missed the mail and do not response in time...)
I also agree with that we could consider the global commits latter separately
after we have addressed the final checkpoints, and a
t; all of the operators have successfully > processed
>> `notifyCheckpointComplete()`. This would be more difficult to implement,
>> hence I would prefer "undefined" behaviour here, but it's probably possible.
>>
>> Very thanks for the further explanation, and
ctly. Regarding the order, I would still tend to we support the
> ordered case, since the sinks' implementation seem to depend
> on this functionality.
>
> Best,
> Yun
>
> --
> From:Piotr Nowojski
> Se
---
From:Piotr Nowojski
Send Time:2021 Mar. 4 (Thu.) 22:56
To:Kezhu Wang
Cc:Till Rohrmann ; Guowei Ma ; dev
; Yun Gao ; jingsongl...@gmail.com
Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Yun and Kezhu
artitionEvent` from `Task` to `StreamTask`. This may
>> > > have some interferences with BatchTask or network io stack.
>> > > * Or introducing stream task level `EndOfUserRecordsEvent`(from
>> PR#14831
>> > > @Yun @Piotr)
>> > > * Or special
Task, we would need some
> > refactors here.
> > 2. Currently the InputGate/InputChannel would be released after the
> > downstream tasks have received
> > EndOfPartitionEvent from all the input channels, this would makes the
> > following checkpoint unable to
> >
From:Piotr Nowojski
Send Time:2021 Mar. 4 (Thu.) 17:16
To:Kezhu Wang
Cc:dev ; Yun Gao ;
jingsongl...@gmail.com ; Guowei Ma
; Till Rohrmann
Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Kezhu,
What do you mean by “end-flushing”? I was sugges
g
> > that EndOfPartitionEvent
> > is that
> > 1. The EndOfPartitionEvent is currently emitted in Task instead of
> > StreamTask, we would need some
> > refactors here.
> > 2. Currently the InputGate/InputChannel would be released after the
> > downstream tasks have received
> > EndOfPartitionEvent from all the input channels, thi
> Yun
>
>
> ------------------
> From:Kezhu Wang
> Send Time:2021 Mar. 1 (Mon.) 01:26
> To:Till Rohrmann
> Cc:Piotr Nowojski ; Guowei Ma <
> guowei@gmail.com>; dev ; Yun Gao <
> yungao...@aliyun.com>; jin
t; when the
> job finish again, it would re-emit the MAX_WATERMARK?
>
> Best,
> Yun
>
>
> --------------------------
> From:Kezhu Wang
> Send Time:2021 Mar. 1 (Mon.) 01:26
> To:Till Rohrmann
> Cc:Piotr Nowojski ; Guowei Ma <
e of the operator, like in Kezhu's proposal the close()
> happens
> > at last, but it seems close() might also emit records (
> > so now the operator are closed with op1's close() -> op2's endOfInput()
> ->
> > op2's close() -> op3's endOfinput ->
-
From:Kezhu Wang
Send Time:2021 Mar. 1 (Mon.) 01:26
To:Till Rohrmann
Cc:Piotr Nowojski ; Guowei Ma ;
dev ; Yun Gao ;
jingsongl...@gmail.com
Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
In “stop-with-savepoint —drain”, MAX_WATERMARK is not an is
pens
> > at last, but it seems close() might also emit records (
> > so now the operator are closed with op1's close() -> op2's endOfInput()
> ->
> > op2's close() -> op3's endOfinput -> ...) ?
> >
> > And on the other side, as Kezhu has also p
the operator, like in Kezhu's proposal the close()
> happens
> > at last, but it seems close() might also emit records (
> > so now the operator are closed with op1's close() -> op2's endOfInput()
> ->
> > op2's close() -> op3's endOfinp
ith one savepoint, and for the normal exit, the operator would
not
> need to wait for other slow operators to exit.
>
> Best,
> Yun
>
>
>
> --Original Mail --
> *Sender:*Kezhu Wang
> *Send Date:*Thu Feb 25 15:11:53 2021
> *Recipients:*Fli
otifyCheckpointComplete, stop-with-savepoint --drain could
> be done with one savepoint, and for the normal exit, the operator would not
> need to wait for other slow operators to exit.
>
> Best,
> Yun
>
>
>
> --Original Mail --
> *Sender:*Ke
d not
need to wait for other slow operators to exit.
Best,
Yun
--Original Mail --
*Sender:*Kezhu Wang
*Send Date:*Thu Feb 25 15:11:53 2021
*Recipients:*Flink Dev , Piotr Nowojski <
piotr.nowoj...@gmail.com>
*CC:*Guowei Ma , jingsongl...@gmail.com <
jingsongl
pients:Flink Dev , Piotr Nowojski
CC:Guowei Ma , jingsongl...@gmail.com
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all, thanks for driving this and especially Piotr for re-active this
thread.
First, for `notifyCheckpointComplete`, I have strong preference to
Hi all, thanks for driving this and especially Piotr for re-active this
thread.
First, for `notifyCheckpointComplete`, I have strong preference towards
"shut down the dataflow
pipeline with one checkpoint in total", so I tend to option dropping "send
records" from
`notifyCheckpointComplete` for ne
Thanks for the reponses Guowei and Yun,
Could you elaborate more/remind me, what does it mean to replace emitting
results from the `notifyCheckpointComplete` with `OperatorCoordinator`
approach?
About the discussion in FLINK-21133 and how it relates to FLIP-147. You are
right Yun gao, that in cas
Hi Till, Guowei,
Very thanks for initiating the disucssion and the deep thoughts!
For the notifyCheckpointComplete, I also agree we could try to avoid
emitting new records in notifyCheckpointComplete via using OperatorCoordinator
for new sink API. Besides, the hive sink might also need some modi
Hi, Till
Thank you very much for your careful consideration
*1. Emit records in `NotifyCheckpointComplete`.*
Sorry for making you misunderstanding because of my expression. I just
want to say the current interface does not prevent users from doing it.
>From the perspective of the new sink api,
Thanks for the explanation Yun and Guowei. I have to admit that I do not
fully understand why this is strictly required but I think that we are
touching two very important aspects which might have far fetching
consequences for how Flink works:
1) Do we want to allow that multiple checkpoints are r
Thanks Yun for the detailed explanation.
A simple supplementary explanation about the sink case: Maybe we could use
`OperatorCoordinator` to avoid sending the element to the downstream
operator.
But I agree we could not limit the users not to emit records in the
`notiyCheckpointComplete`.
Best,
Gu
Hi all,
I'd like to first detail the issue with emitting records in
notifyCheckpointComplete for context. For specific usage,
an example would be for sink, it might want to write some metadata after all
the transactions are committed
(like write a marker file _SUCCESS to the output directory).
Hi Piotr,
Thank you for raising your concern. Unfortunately, I do not have a better
idea than doing closing of operators intermittently with checkpoints (=
multiple last checkpoints).
However, two ideas on how to improve the overall user experience:
1. If an operator is not relying on notifyCheck
Hey,
I would like to raise a concern about implementation of the final
checkpoints taking into account operators/functions that are implementing
two phase commit (2pc) protocol for exactly-once processing with some
external state (kept outside of the Flink). Primarily exactly-once sinks.
First of
Hi Aljoscha,
I think so since we seems to do not have other divergence and new
objections now. I'll open the vote then. Very thanks!
Best,
Yun
--
From:Aljoscha Krettek
Send Time:2021 Jan. 15 (Fri.) 21:24
To:dev
Subject:Re:
Thanks for the summary! I think we can now move towards a [VOTE] thread,
right?
On 2021/01/15 13:43, Yun Gao wrote:
1) For the problem that the "new" root task coincidently finished
before getting triggered successfully, we have listed two options in
the FLIP-147[1], for the first version, now
erTasksFinished-TriggeringCheckpointsAfterTasksFinished
--
From:Yun Gao
Send Time:2021 Jan. 13 (Wed.) 16:09
To:dev ; user
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi all,
I updated the FLIP[1] to reflect the
eteBeforeFinish
--
From:Yun Gao
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman
Cc:dev ; user
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Hi Roman,
Very thanks for th
see explicit problems for waiting for the flush of pipeline
result partition.
Glad that we have the same viewpoints on this issue. :)
Best,
Yun
----------
From:Khachatryan Roman
Send Time:2021 Jan. 11 (Mon.) 19:14
To:Yun Gao
Cc:de
d be ok for us to view it as an optimization and
> postpone it to future versions ?
>
> Best,
> Yun
>
>
>
> --
> From:Khachatryan Roman
> Send Time:2021 Jan. 11 (Mon.) 05:46
> To:Yun
e: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
Thanks a lot for your answers Yun,
> In detail, support we have a job with the graph A -> B -> C, support in one
> checkpoint A has reported FINISHED, CheckpointCoordinator would
> choose B as the new "source"
tChannel.onBuffer() is called
> with
> EndOfPartition) and then taking snapshot for the input channels, as the
> normal unaligned checkpoints does for the InputChannel side. Then
> we would be able to ensure the finished tasks always have an empty state.
>
> I'll also optimi
Hi Roman,
Very thanks for the feedbacks! I'll try to answer the issues inline:
> 1. Option 1 is said to be not preferable because it wastes resources and adds
> complexity (new event).
> However, the resources would be wasted for a relatively short time until the
> job finishes completely.
Thanks for starting this discussion (and sorry for probably duplicated
questions, I couldn't find them answered in FLIP or this thread).
1. Option 1 is said to be not preferable because it wastes resources and
adds complexity (new event).
However, the resources would be wasted for a relatively sho
>
> We could introduce an interface, sth like `RequiresFinalization` or
> `FinalizationListener` (all bad names). The operator itself knows when
> it is ready to completely shut down, Async I/O would wait for all
> requests, sink would potentially wait for a given number of checkpoints.
> The inter
This is somewhat unrelated to the discussion about how to actually do
the triggering when sources shut down, I'll write on that separately. I
just wanted to get this quick thought out.
For letting operators decide whether they actually want to wait for a
final checkpoint, which is relevant at
Hi Arvid,
Very thanks for the deep thoughts !
> If this somehow works, we would not need to change much in the checkpoint
> coordinator. He would always inject into sources. We could also ignore the
> race conditions as long as the TM lives. Checkpointing times are also not
> worse as with the l
Okay then at least you guys are in sync ;) (Although I'm also not too far
away)
I hope I'm not super derailing but could we reiterate why it's good to get
rid of finished tasks (note: I'm also mostly in favor of that):
1. We can free all acquired resources including buffer pools, state
backend(?),
On 2021/01/06 16:05, Arvid Heise wrote:
thanks for the detailed example. It feels like Aljoscha and you are also
not fully aligned yet. For me, it sounded as if Aljoscha would like to
avoid sending RPC to non-source subtasks.
No, I think we need the triggering of intermediate operators.
I was
On 2021/01/06 13:35, Arvid Heise wrote:
I was actually not thinking about concurrent checkpoints (and actually want
to get rid of them once UC is established, since they are addressing the
same thing).
I would give a yuge +1 to that. I don't see why we would need concurrent
checkpoints in most
re-trigger the
> following tasks.
> Of couse this is one possible implementation and we might have other
> solutions to this problem. Do you think the process would still have some
> problems ?
>
> > However, that would
> > require subtasks to stay alive until they receive checkpiontCompleted
Hi Arvid,
Very thanks for the feedbacks! I'll try to answer the questions inline:
> I'm also concerned about the notion of a final checkpoint. What happens
> when this final checkpoint times out (checkpoint timeout > async timeout)
> or fails for a different reason? I'm currently more inclined to
I was actually not thinking about concurrent checkpoints (and actually want
to get rid of them once UC is established, since they are addressing the
same thing).
But your explanation definitely helped me to better understand the race
condition.
However, I have the impression that you think mostly
On 2021/01/06 11:30, Arvid Heise wrote:
I'm assuming that this is the normal case. In a A->B graph, as soon as A
finishes, B still has a couple of input buffers to process. If you add
backpressure or longer pipelines into the mix, it's quite likely that a
checkpoint may occur with B being the hea
>
> I was referring to the case where intermediate operators don't have any
> active upstream (input) operators. In that case, they basically become
> the "source" of that part of the graph. In your example, M1 is still
> connected to a "real" source.
I'm assuming that this is the normal case. In
On 2021/01/05 17:27, Arvid Heise wrote:
For your question: will there ever be intermediate operators that should be
running that are not connected to at least once source?
I think there are plenty of examples if you go beyond chained operators and
fully connected exchanges. Think of any fan-in, l
Hi Arvid,
Very thanks for the feedbacks!
> For 2) the race condition, I was more thinking of still injecting the
> barrier at the source in all cases, but having some kind of
short-cut to
> immediately execute the RPC inside the respective taskmanager.
For 2) the race condition, I was more thinking of still injecting the
barrier at the source in all cases, but having some kind of short-cut to
immediately execute the RPC inside the respective taskmanager. However,
that may prove hard in case of dynamic scale-ins. Nevertheless, because of
this race
Hi Aljoscha,
Very thanks for the feedbacks!
For the second issue, I'm indeed thinking the race condition between
deciding to trigger and operator get finished. And for this point,
> One thought here is this: will there ever be intermediate operators that
> should be run
Hi Avrid,
Very thanks for the feedbacks!
For the second issue, sorry I think I might not make it very clear,
I'm initially thinking the case that for example for a job with graph A -> B ->
C, when we compute which tasks to trigger, A is still running, so we trigger A
to
On 2021/01/05 10:16, Arvid Heise wrote:
1. I'd think that this is an orthogonal issue, which I'd solve separately.
My gut feeling says that this is something we should only address for new
sinks where we decouple the semantics of commits and checkpoints
anyways. @Aljoscha
Krettek any idea on thi
Hi Yun,
1. I'd think that this is an orthogonal issue, which I'd solve separately.
My gut feeling says that this is something we should only address for new
sinks where we decouple the semantics of commits and checkpoints
anyways. @Aljoscha
Krettek any idea on this one?
2. I'm not sure I get it
Hi all,
I tested the previous PoC with the current tests and I found some new
issues that might cause divergence, and sorry for there might also be some
reversal for some previous problems:
1. Which operators should wait for one more checkpoint before close ?
One motiv
Hi Aljoscha,
Very thanks for the feedbacks! For the remaining issues:
> 1. You mean we would insert "artificial" barriers for barrier 2 in case
we receive EndOfPartition while other inputs have already received barrier 2?
I think that makes sense, yes.
Yes, exactly, I
Thanks for the thorough update! I'll answer inline.
On 14.12.20 16:33, Yun Gao wrote:
1. To include EndOfPartition into consideration for barrier alignment at
the TM side, we now tend to decouple the logic for EndOfPartition with the
normal alignment behaviors to avoid the complex interfe
Hi all,
I would like to resume this discussion for supporting checkpoints after
tasks Finished :) Based on the previous discussion, we now implement a version
of PoC [1] to try the idea. During the PoC we also met with some possible
issues:
1. To include EndOfPartition into considerat
Hi Till,
Very thanks for the feedbacks !
> 1) When restarting all tasks independent of the status at checkpoint time
> (finished, running, scheduled), we might allocate more resources than we
> actually need to run the remaining job. From a scheduling perspective it
> would be easier if we alrea
Thanks for starting this discussion Yun Gao,
I have three comments/questions:
1) When restarting all tasks independent of the status at checkpoint time
(finished, running, scheduled), we might allocate more resources than we
actually need to run the remaining job. From a scheduling perspective it
Hi Arvid,
Very thanks for the comments!
>>> 4) Yes, the interaction is not trivial and also I have not completely
>>> thought it through. But in general, I'm currently at the point where I
>>> think that we also need non-checkpoint related events in unaligned
>>> checkpoints. So just keep that i
Hi Yun,
4) Yes, the interaction is not trivial and also I have not completely
thought it through. But in general, I'm currently at the point where I
think that we also need non-checkpoint related events in unaligned
checkpoints. So just keep that in mind, that we might converge anyhow at
this poin
Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue
under the quota:
>> 1) You call the tasks that get the barriers injected leaf nodes, which would
>> make the > sinks the root nodes. That is very similar to how graphs in
>> relational algebra are labeled. H
Hi Yun,
Thank you for starting the discussion. This will solve one of the
long-standing issues [1] that confuse users. I'm also a big fan of option
3. It is also a bit closer to Chandy-Lamport again.
A couple of comments:
1) You call the tasks that get the barriers injected leaf nodes, which
wou
Hi, devs & users
Very sorry for the spoiled formats, I resent the discussion as follows.
As discussed in FLIP-131[1], Flink will make DataStream the unified API for
processing bounded and unbounded data in both streaming and blocking modes.
However, one long-standing problem for the streaming m
76 matches
Mail list logo