Hi Dawid, I see your point. I'd probably add drain only to Rich*Function where we have the type bounds. Then we still need your Flushable<T> interface in Rich*Function<..., T> to call it efficiently but we at least avoid weird type combinations. I'll have a rethink later.
The proper solution is probably to add <OUT> to RichFunction and use Void for RichSinkFunction but I'll have to understand the implications first. On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hey, > > @Arvid The problem with adding the "drain/flush/stopProcessing" method to > RichFunction is that it is not typed with the output type. At the same time > we would most likely need a way to emit records from the method. That's > originally thought about adding a typed interface which honestly I don't > like that much either. > > On the UDF level we do not need to deprecate anything as you said. The > close there already works as dispose on the Operator level. What we are > suggesting is to unify that on the Operator level and deprecate the dispose > there. @Yun I think we can already do that. We can either try to handle > exceptions from the close in the case of a failure or just break it as it > is a low-level, mostly internal API as Arvid said and also the migration > would be really easy there. > > @Till @Arvid I am open for suggestions about the naming. I like the > "drain" method. > > For now I'd go with @Piotr's proposal to add the "drain" method only to > the SinkFunction. We think they are not immediately necessary for any of > the other UDFs. > > Best, > > Dawid > On 09/06/2021 11:20, Arvid Heise wrote: > > I have not followed the complete discussion and can't comment on the > concepts. However, I have some ideas on the API changes: > > 1. If it's about adding additional life-cycle methods to UDFs, we should > add the flush/endOfInput to RichFunction as this is the current way to > define it. At this point, I don't see the need to add/change anything for > UDFs. Since RichFunction does not have a dispose, do we even need to > deprecate anything on UDF level? This would avoid having a new interface > Flushable altogether (of which I'm not a big fan, see Piot's mail) > > 2. Further, I'd like to propose drain instead of flush as it would more > align with the current nomenclature and makes the intent more obvious. > However, that only works if there is no clash, so please double-check. > > 3. About changing methods on Operators: I'd say go ahead. It's > experimental and not too hard to adjust on the user side. I also like the > idea of beefing up ProcessFunction as a full replacement to custom > operators but I'd keep that effort separate. > > On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann <trohrm...@apache.org> wrote: > >> Thanks for the lively discussion everyone. I have to admit that I am not >> really convinced that we should call the interface Flushable and the >> method >> flush. The problem is that this method should in the first place tell the >> operator that it should stop processing and flush all buffered data. The >> method "flush" alone does not convey this contract very well. Maybe a more >> explicit name like stopProcessingAndFlush (maybe only stopProcessing) >> would >> be better. Moreover, from the OutputStream.flush method, I would expect >> that I can call this method multiple times w/o changing the state of the >> stream. This is not the case here. >> >> Given that the stop processing and flush all results is such an essential >> lifecycle method of an operator/UDF, I am not sure whether we should offer >> it as an optional interface users can implement. The problem I see is that >> users are not aware of it when writing their own operators/UDFs. Making it >> part of the actual interfaces makes it more explicit and easier to >> discover. Maybe there is a way of adding it together with a default >> implementation, deprecating the other methods, and then eventually >> removing >> the old methods. The last step will break APIs, though :-( >> >> Cheers, >> Till >> >> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >> > Hi, >> > >> > Thanks for resuming this discussion. I think +1 for the proposal of >> > dropping (deprecating) `dispose()`, and adding `flush()` to the >> > `StreamOperator`/udfs. Semantically it would be more like new `close()` >> is >> > an equivalent of old `dispose()`. Old `close()` is an equivalent of new >> > `flush() + close()`. I think it provides a relatively painless migration >> > path (could we write down this migration?). >> > >> > However I have some doubts about the Flushable<T> interface. First of >> all, >> > it wouldn't work for sinks - sinks have no output. Secondly, I don't >> like >> > that it opens a possibility for problems like this (incompatible output >> > types): >> > ``` >> > public class MyMap implements MapFunction<String, Long>, >> Flushable<Double> >> > { ...} >> > ``` >> > >> > Also after a quick offline discussion with Dawid, I'm not sure anymore >> to >> > which UDFs it actually makes sense to add `flush`, as most of them >> > shouldn't buffer any data. Apart from Sinks, it's usually an operator >> that >> > is buffering the data (that holds true for AsyncFunction, >> ReduceFunction, >> > AggregateFunction, MapFunction, FilterFunction, ...). For those >> functions >> > it's difficult to buffer any data, as they have no means to control >> when to >> > emit this data. One notable exception might be (Co)ProcessFunction, as >> it >> > can register timers on it's own. In that case I would propose to do the >> > following thing: >> > 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface >> already >> > has flush capabilities) >> > 2. Maybe add `flush(Collector<O>)` to `(Co)ProcessFunction`, but maybe >> we >> > can postpone it >> > 3. Leave other functions alone. >> > >> > After all, we could add `flush()` to other functions in the future if we >> > really find a good motivating example to do so. >> > >> > About 2. Dawid is pitching an idea to convert `ProcessFunction` into a >> > proper `Public` API that would replace StreamOperator. We could change >> > `StreamOperator` to be purely `@Internal` class/interface, and add the >> > missing functionality to the `ProcessFunction` (InputSelectable, >> > BoundedInput, MailboxExecutor). With this, adding `flush()` to >> > `ProcessFunction` would make a lot of sense. But maybe that should be a >> > story for another day? >> > >> > Best, >> > Piotrek >> > >> > pt., 4 cze 2021 o 10:36 Yun Gao <yungao...@aliyun.com> napisaĆ(a): >> > >> >> Hi all, >> >> >> >> Very thanks @Dawid for resuming the discussion and very thanks @Till >> for >> >> the summary ! (and very sorry for I missed the mail and do not response >> >> in time...) >> >> >> >> I also agree with that we could consider the global commits latter >> >> separately after we have addressed the final checkpoints, and also >> other >> >> points as Till summarized. >> >> Currently the only case that have used the cascade commit is the Table >> >> FileSystem and Hive connectors. I checked the code and found currently >> they >> >> will commit the >> >> last piece of data directly in endOfInput(). Although this might emit >> >> repeat records if there are failover during job finishing, it avoids >> >> emitting the records in the >> >> notifyCheckpointComplete() after endOfInput(), thus the modification to >> >> the operator lifecycle in final checkpoints would cause compatibility >> >> problem for these connectors, >> >> thus we do not need to modify them at the first place. >> >> >> >> 2. Regarding the operator lifecycle, I also agree with the proposed >> >> changes. To sum up, I think the operator lifecycle would become >> >> >> >> endOfInput(1) >> >> ... >> >> endOfInput(n) >> >> flush() --> call UDF's flush method >> >> if some operator requires final checkpoints >> >> snapshotState() >> >> notifyCheckpointComplete() >> >> end if >> >> close() --> call UDF's close method >> >> >> >> Since currently the close() is only called in normal finish and >> dispose() >> >> will be called in both failover and normal case, for compatibility, I >> think >> >> we may >> >> have to postpone the change to a single close() method to version 2.0 ? >> >> >> >> 3. Regarding the name and position of flush() method, I also agree with >> >> that we will need a separate method to mark the termination of the >> whole >> >> stream for >> >> multiple-input streams. Would it be also ok if we have some >> modification >> >> to the current BoundedXXInput interfaces to >> >> >> >> interface BoundedInput { >> >> void endInput() // marks the end of the whole streams, as flush() >> >> does. >> >> } >> >> >> >> @deprecated // In the future we could remove this interface >> >> interface BoundedOneInput extends BoundedInput {} >> >> >> >> interface BoundedMultiInput extends BoundedInput { >> >> void endInput(int i); >> >> >> >> default void endInput() {} // For compatibility >> >> } >> >> >> >> If operator/UDF does not care about the end of a single input, then it >> >> could directly implement the BoundedInput interface. The possible >> >> benefit to me is that we might be able to keep only one concept for >> >> marking the end of stream, especially for the operators with only >> >> one input. >> >> >> >> Very thanks for all the deep insights and discussions! >> >> >> >> Best, >> >> Yun >> >> >> >> ------------------------------------------------------------------ >> >> From:Dawid Wysakowicz <dwysakow...@apache.org> >> >> Send Time:2021 Jun. 3 (Thu.) 21:21 >> >> To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org>; >> Yun >> >> Gao <yungao...@aliyun.com> >> >> Cc:Piotr Nowojski <pnowoj...@apache.org>; Guowei Ma < >> guowei....@gmail.com>; >> >> Stephan Ewen <se...@apache.org> >> >> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks >> Finished >> >> >> >> Hi all, >> >> >> >> Thanks for the very insightful discussion. I'd like to revive the >> effort >> >> of FLIP-147. First of all, from my side I'd like to say that I am >> really >> >> interested in helping that happen in the upcoming 1.14 release. >> >> >> >> I agree with Till that the final checkpoints and global commits are >> >> mostly orthogonal. Similarly as Till, I'd suggest to first focus on the >> >> final checkpoints, while just keeping in mind we should not make >> >> assumptions that would make it impossible to implement the global >> commits. >> >> So far I do not see such risk from the discussion. >> >> >> >> Going back to the final checkpoints issue. I think the only outstanding >> >> issue is which methods we want to use for flushing/closing both >> operators >> >> and UDFs just before performing the final checkpoint. As pointed out >> to me >> >> by Piotr, I am mentioning UDFs here as well, because we need a way for >> >> users using the Public API to benefit from the final checkpoint (bear >> in >> >> mind that e.g. TwoPhaseCommitSinkFunction which is implemented by our >> Kafka >> >> sink operates on the UDF level). Right now RichFunction has no method >> which >> >> could be called just before the final checkpoint that would say >> "flush" all >> >> intermediate state now and prepare for the final checkpoint. I'd >> suggest >> >> introducing an additional interface e.g. (name to be determined) >> >> >> >> interface Flushable<T> { >> >> >> >> void flush(Collector<T> out) >> >> >> >> } >> >> >> >> Additionally we would need to introduce a similar method on the >> >> StreamOperator level. Currently we have two methods that are called at >> the >> >> end of operator lifecycle: >> >> >> >> - >> >> - close >> >> - dispose >> >> >> >> The usage of the two methods is a bit confusing. Dispose is responsible >> >> for closing all open resources and is supposed to be called in case of >> a >> >> failure. On the other hand the close is a combination of a non-existent >> >> "flush" method we need and dispose for closing resources in case of a >> >> successful run. I'd suggest to clear it a bit. We would introduce a >> proper >> >> "flush" method which would be called in case of a successful finishing >> of >> >> an operator. Moreover we would make "close" deal only with closing any >> open >> >> resources, basically taking over the role of the dispose, which we >> would >> >> deprecate. >> >> >> >> Lastly, I'd like to say why I think it is better introduce a new >> "flush" >> >> method instead of using the "endInput" method of BoundedOne/MultiInput. >> >> That method is called per input, which means each operator would need >> to >> >> keep track of which inputs were already closed internally and react >> >> differently if all of the inputs were closed. With an explicit "flush" >> >> method we do not have such a problem as the input bookkeeping happens >> on >> >> the StreamTask level. >> >> >> >> Let me know what you think. I'd sync with Yun Gao and if there are no >> >> objections we will extend the FLIP page with necessary changes. >> >> >> >> Best, >> >> >> >> Dawid >> >> >> >> >> >> >> >