Hi, Arvid: What's the problem with providing `void flush()`/`void drain()` only in the `SinkFunction`? It would avoid the problem of typing. Why would one need to have it in the other `Rich***Function`s? For `flush()` to make sense, the entity which has this method, would need to buffer some data. I don't see this to be reasonable in anything but `SinkFunction`, `ProcessFunction` and operators.
Re `flush()` vs `drain()`. Frankly, I wouldn't understand what `drain()` is all about without reading the java-doc, and afterwards, I would have an impression that someone wanted to reinvent a wheel :) `flush()` is kind of an industry standard for things like that. Furthermore I don't think `drain()` solves Till's concern (drain != stop + flush). `stopAndFlush()` would be better in this regard, but it also doesn't feel right. Maybe `finish()`? Piotrek śr., 9 cze 2021 o 11:51 Arvid Heise <ar...@apache.org> napisał(a): > Hi Dawid, > > I see your point. I'd probably add drain only to Rich*Function where we > have the type bounds. Then we still need your Flushable<T> interface in > Rich*Function<..., T> to call it efficiently but we at least avoid weird > type combinations. I'll have a rethink later. > > The proper solution is probably to add <OUT> to RichFunction and use Void > for RichSinkFunction but I'll have to understand the implications first. > > On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hey, >> >> @Arvid The problem with adding the "drain/flush/stopProcessing" method to >> RichFunction is that it is not typed with the output type. At the same time >> we would most likely need a way to emit records from the method. That's >> originally thought about adding a typed interface which honestly I don't >> like that much either. >> >> On the UDF level we do not need to deprecate anything as you said. The >> close there already works as dispose on the Operator level. What we are >> suggesting is to unify that on the Operator level and deprecate the dispose >> there. @Yun I think we can already do that. We can either try to handle >> exceptions from the close in the case of a failure or just break it as it >> is a low-level, mostly internal API as Arvid said and also the migration >> would be really easy there. >> >> @Till @Arvid I am open for suggestions about the naming. I like the >> "drain" method. >> >> For now I'd go with @Piotr's proposal to add the "drain" method only to >> the SinkFunction. We think they are not immediately necessary for any of >> the other UDFs. >> >> Best, >> >> Dawid >> On 09/06/2021 11:20, Arvid Heise wrote: >> >> I have not followed the complete discussion and can't comment on the >> concepts. However, I have some ideas on the API changes: >> >> 1. If it's about adding additional life-cycle methods to UDFs, we should >> add the flush/endOfInput to RichFunction as this is the current way to >> define it. At this point, I don't see the need to add/change anything for >> UDFs. Since RichFunction does not have a dispose, do we even need to >> deprecate anything on UDF level? This would avoid having a new interface >> Flushable altogether (of which I'm not a big fan, see Piot's mail) >> >> 2. Further, I'd like to propose drain instead of flush as it would more >> align with the current nomenclature and makes the intent more obvious. >> However, that only works if there is no clash, so please double-check. >> >> 3. About changing methods on Operators: I'd say go ahead. It's >> experimental and not too hard to adjust on the user side. I also like the >> idea of beefing up ProcessFunction as a full replacement to custom >> operators but I'd keep that effort separate. >> >> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Thanks for the lively discussion everyone. I have to admit that I am not >>> really convinced that we should call the interface Flushable and the >>> method >>> flush. The problem is that this method should in the first place tell the >>> operator that it should stop processing and flush all buffered data. The >>> method "flush" alone does not convey this contract very well. Maybe a >>> more >>> explicit name like stopProcessingAndFlush (maybe only stopProcessing) >>> would >>> be better. Moreover, from the OutputStream.flush method, I would expect >>> that I can call this method multiple times w/o changing the state of the >>> stream. This is not the case here. >>> >>> Given that the stop processing and flush all results is such an essential >>> lifecycle method of an operator/UDF, I am not sure whether we should >>> offer >>> it as an optional interface users can implement. The problem I see is >>> that >>> users are not aware of it when writing their own operators/UDFs. Making >>> it >>> part of the actual interfaces makes it more explicit and easier to >>> discover. Maybe there is a way of adding it together with a default >>> implementation, deprecating the other methods, and then eventually >>> removing >>> the old methods. The last step will break APIs, though :-( >>> >>> Cheers, >>> Till >>> >>> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>> > Hi, >>> > >>> > Thanks for resuming this discussion. I think +1 for the proposal of >>> > dropping (deprecating) `dispose()`, and adding `flush()` to the >>> > `StreamOperator`/udfs. Semantically it would be more like new >>> `close()` is >>> > an equivalent of old `dispose()`. Old `close()` is an equivalent of new >>> > `flush() + close()`. I think it provides a relatively painless >>> migration >>> > path (could we write down this migration?). >>> > >>> > However I have some doubts about the Flushable<T> interface. First of >>> all, >>> > it wouldn't work for sinks - sinks have no output. Secondly, I don't >>> like >>> > that it opens a possibility for problems like this (incompatible output >>> > types): >>> > ``` >>> > public class MyMap implements MapFunction<String, Long>, >>> Flushable<Double> >>> > { ...} >>> > ``` >>> > >>> > Also after a quick offline discussion with Dawid, I'm not sure anymore >>> to >>> > which UDFs it actually makes sense to add `flush`, as most of them >>> > shouldn't buffer any data. Apart from Sinks, it's usually an operator >>> that >>> > is buffering the data (that holds true for AsyncFunction, >>> ReduceFunction, >>> > AggregateFunction, MapFunction, FilterFunction, ...). For those >>> functions >>> > it's difficult to buffer any data, as they have no means to control >>> when to >>> > emit this data. One notable exception might be (Co)ProcessFunction, as >>> it >>> > can register timers on it's own. In that case I would propose to do the >>> > following thing: >>> > 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface >>> already >>> > has flush capabilities) >>> > 2. Maybe add `flush(Collector<O>)` to `(Co)ProcessFunction`, but maybe >>> we >>> > can postpone it >>> > 3. Leave other functions alone. >>> > >>> > After all, we could add `flush()` to other functions in the future if >>> we >>> > really find a good motivating example to do so. >>> > >>> > About 2. Dawid is pitching an idea to convert `ProcessFunction` into a >>> > proper `Public` API that would replace StreamOperator. We could change >>> > `StreamOperator` to be purely `@Internal` class/interface, and add the >>> > missing functionality to the `ProcessFunction` (InputSelectable, >>> > BoundedInput, MailboxExecutor). With this, adding `flush()` to >>> > `ProcessFunction` would make a lot of sense. But maybe that should be a >>> > story for another day? >>> > >>> > Best, >>> > Piotrek >>> > >>> > pt., 4 cze 2021 o 10:36 Yun Gao <yungao...@aliyun.com> napisał(a): >>> > >>> >> Hi all, >>> >> >>> >> Very thanks @Dawid for resuming the discussion and very thanks @Till >>> for >>> >> the summary ! (and very sorry for I missed the mail and do not >>> response >>> >> in time...) >>> >> >>> >> I also agree with that we could consider the global commits latter >>> >> separately after we have addressed the final checkpoints, and also >>> other >>> >> points as Till summarized. >>> >> Currently the only case that have used the cascade commit is the Table >>> >> FileSystem and Hive connectors. I checked the code and found >>> currently they >>> >> will commit the >>> >> last piece of data directly in endOfInput(). Although this might emit >>> >> repeat records if there are failover during job finishing, it avoids >>> >> emitting the records in the >>> >> notifyCheckpointComplete() after endOfInput(), thus the modification >>> to >>> >> the operator lifecycle in final checkpoints would cause compatibility >>> >> problem for these connectors, >>> >> thus we do not need to modify them at the first place. >>> >> >>> >> 2. Regarding the operator lifecycle, I also agree with the proposed >>> >> changes. To sum up, I think the operator lifecycle would become >>> >> >>> >> endOfInput(1) >>> >> ... >>> >> endOfInput(n) >>> >> flush() --> call UDF's flush method >>> >> if some operator requires final checkpoints >>> >> snapshotState() >>> >> notifyCheckpointComplete() >>> >> end if >>> >> close() --> call UDF's close method >>> >> >>> >> Since currently the close() is only called in normal finish and >>> dispose() >>> >> will be called in both failover and normal case, for compatibility, I >>> think >>> >> we may >>> >> have to postpone the change to a single close() method to version 2.0 >>> ? >>> >> >>> >> 3. Regarding the name and position of flush() method, I also agree >>> with >>> >> that we will need a separate method to mark the termination of the >>> whole >>> >> stream for >>> >> multiple-input streams. Would it be also ok if we have some >>> modification >>> >> to the current BoundedXXInput interfaces to >>> >> >>> >> interface BoundedInput { >>> >> void endInput() // marks the end of the whole streams, as flush() >>> >> does. >>> >> } >>> >> >>> >> @deprecated // In the future we could remove this interface >>> >> interface BoundedOneInput extends BoundedInput {} >>> >> >>> >> interface BoundedMultiInput extends BoundedInput { >>> >> void endInput(int i); >>> >> >>> >> default void endInput() {} // For compatibility >>> >> } >>> >> >>> >> If operator/UDF does not care about the end of a single input, then it >>> >> could directly implement the BoundedInput interface. The possible >>> >> benefit to me is that we might be able to keep only one concept for >>> >> marking the end of stream, especially for the operators with only >>> >> one input. >>> >> >>> >> Very thanks for all the deep insights and discussions! >>> >> >>> >> Best, >>> >> Yun >>> >> >>> >> ------------------------------------------------------------------ >>> >> From:Dawid Wysakowicz <dwysakow...@apache.org> >>> >> Send Time:2021 Jun. 3 (Thu.) 21:21 >>> >> To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org>; >>> Yun >>> >> Gao <yungao...@aliyun.com> >>> >> Cc:Piotr Nowojski <pnowoj...@apache.org>; Guowei Ma < >>> guowei....@gmail.com>; >>> >> Stephan Ewen <se...@apache.org> >>> >> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks >>> Finished >>> >> >>> >> Hi all, >>> >> >>> >> Thanks for the very insightful discussion. I'd like to revive the >>> effort >>> >> of FLIP-147. First of all, from my side I'd like to say that I am >>> really >>> >> interested in helping that happen in the upcoming 1.14 release. >>> >> >>> >> I agree with Till that the final checkpoints and global commits are >>> >> mostly orthogonal. Similarly as Till, I'd suggest to first focus on >>> the >>> >> final checkpoints, while just keeping in mind we should not make >>> >> assumptions that would make it impossible to implement the global >>> commits. >>> >> So far I do not see such risk from the discussion. >>> >> >>> >> Going back to the final checkpoints issue. I think the only >>> outstanding >>> >> issue is which methods we want to use for flushing/closing both >>> operators >>> >> and UDFs just before performing the final checkpoint. As pointed out >>> to me >>> >> by Piotr, I am mentioning UDFs here as well, because we need a way for >>> >> users using the Public API to benefit from the final checkpoint (bear >>> in >>> >> mind that e.g. TwoPhaseCommitSinkFunction which is implemented by our >>> Kafka >>> >> sink operates on the UDF level). Right now RichFunction has no method >>> which >>> >> could be called just before the final checkpoint that would say >>> "flush" all >>> >> intermediate state now and prepare for the final checkpoint. I'd >>> suggest >>> >> introducing an additional interface e.g. (name to be determined) >>> >> >>> >> interface Flushable<T> { >>> >> >>> >> void flush(Collector<T> out) >>> >> >>> >> } >>> >> >>> >> Additionally we would need to introduce a similar method on the >>> >> StreamOperator level. Currently we have two methods that are called >>> at the >>> >> end of operator lifecycle: >>> >> >>> >> - >>> >> - close >>> >> - dispose >>> >> >>> >> The usage of the two methods is a bit confusing. Dispose is >>> responsible >>> >> for closing all open resources and is supposed to be called in case >>> of a >>> >> failure. On the other hand the close is a combination of a >>> non-existent >>> >> "flush" method we need and dispose for closing resources in case of a >>> >> successful run. I'd suggest to clear it a bit. We would introduce a >>> proper >>> >> "flush" method which would be called in case of a successful >>> finishing of >>> >> an operator. Moreover we would make "close" deal only with closing >>> any open >>> >> resources, basically taking over the role of the dispose, which we >>> would >>> >> deprecate. >>> >> >>> >> Lastly, I'd like to say why I think it is better introduce a new >>> "flush" >>> >> method instead of using the "endInput" method of >>> BoundedOne/MultiInput. >>> >> That method is called per input, which means each operator would need >>> to >>> >> keep track of which inputs were already closed internally and react >>> >> differently if all of the inputs were closed. With an explicit "flush" >>> >> method we do not have such a problem as the input bookkeeping happens >>> on >>> >> the StreamTask level. >>> >> >>> >> Let me know what you think. I'd sync with Yun Gao and if there are no >>> >> objections we will extend the FLIP page with necessary changes. >>> >> >>> >> Best, >>> >> >>> >> Dawid >>> >> >>> >> >>> >> >>> >>