Thanks for the lively discussion everyone. I have to admit that I am not really convinced that we should call the interface Flushable and the method flush. The problem is that this method should in the first place tell the operator that it should stop processing and flush all buffered data. The method "flush" alone does not convey this contract very well. Maybe a more explicit name like stopProcessingAndFlush (maybe only stopProcessing) would be better. Moreover, from the OutputStream.flush method, I would expect that I can call this method multiple times w/o changing the state of the stream. This is not the case here.
Given that the stop processing and flush all results is such an essential lifecycle method of an operator/UDF, I am not sure whether we should offer it as an optional interface users can implement. The problem I see is that users are not aware of it when writing their own operators/UDFs. Making it part of the actual interfaces makes it more explicit and easier to discover. Maybe there is a way of adding it together with a default implementation, deprecating the other methods, and then eventually removing the old methods. The last step will break APIs, though :-( Cheers, Till On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi, > > Thanks for resuming this discussion. I think +1 for the proposal of > dropping (deprecating) `dispose()`, and adding `flush()` to the > `StreamOperator`/udfs. Semantically it would be more like new `close()` is > an equivalent of old `dispose()`. Old `close()` is an equivalent of new > `flush() + close()`. I think it provides a relatively painless migration > path (could we write down this migration?). > > However I have some doubts about the Flushable<T> interface. First of all, > it wouldn't work for sinks - sinks have no output. Secondly, I don't like > that it opens a possibility for problems like this (incompatible output > types): > ``` > public class MyMap implements MapFunction<String, Long>, Flushable<Double> > { ...} > ``` > > Also after a quick offline discussion with Dawid, I'm not sure anymore to > which UDFs it actually makes sense to add `flush`, as most of them > shouldn't buffer any data. Apart from Sinks, it's usually an operator that > is buffering the data (that holds true for AsyncFunction, ReduceFunction, > AggregateFunction, MapFunction, FilterFunction, ...). For those functions > it's difficult to buffer any data, as they have no means to control when to > emit this data. One notable exception might be (Co)ProcessFunction, as it > can register timers on it's own. In that case I would propose to do the > following thing: > 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface already > has flush capabilities) > 2. Maybe add `flush(Collector<O>)` to `(Co)ProcessFunction`, but maybe we > can postpone it > 3. Leave other functions alone. > > After all, we could add `flush()` to other functions in the future if we > really find a good motivating example to do so. > > About 2. Dawid is pitching an idea to convert `ProcessFunction` into a > proper `Public` API that would replace StreamOperator. We could change > `StreamOperator` to be purely `@Internal` class/interface, and add the > missing functionality to the `ProcessFunction` (InputSelectable, > BoundedInput, MailboxExecutor). With this, adding `flush()` to > `ProcessFunction` would make a lot of sense. But maybe that should be a > story for another day? > > Best, > Piotrek > > pt., 4 cze 2021 o 10:36 Yun Gao <yungao...@aliyun.com> napisaĆ(a): > >> Hi all, >> >> Very thanks @Dawid for resuming the discussion and very thanks @Till for >> the summary ! (and very sorry for I missed the mail and do not response >> in time...) >> >> I also agree with that we could consider the global commits latter >> separately after we have addressed the final checkpoints, and also other >> points as Till summarized. >> Currently the only case that have used the cascade commit is the Table >> FileSystem and Hive connectors. I checked the code and found currently they >> will commit the >> last piece of data directly in endOfInput(). Although this might emit >> repeat records if there are failover during job finishing, it avoids >> emitting the records in the >> notifyCheckpointComplete() after endOfInput(), thus the modification to >> the operator lifecycle in final checkpoints would cause compatibility >> problem for these connectors, >> thus we do not need to modify them at the first place. >> >> 2. Regarding the operator lifecycle, I also agree with the proposed >> changes. To sum up, I think the operator lifecycle would become >> >> endOfInput(1) >> ... >> endOfInput(n) >> flush() --> call UDF's flush method >> if some operator requires final checkpoints >> snapshotState() >> notifyCheckpointComplete() >> end if >> close() --> call UDF's close method >> >> Since currently the close() is only called in normal finish and dispose() >> will be called in both failover and normal case, for compatibility, I think >> we may >> have to postpone the change to a single close() method to version 2.0 ? >> >> 3. Regarding the name and position of flush() method, I also agree with >> that we will need a separate method to mark the termination of the whole >> stream for >> multiple-input streams. Would it be also ok if we have some modification >> to the current BoundedXXInput interfaces to >> >> interface BoundedInput { >> void endInput() // marks the end of the whole streams, as flush() >> does. >> } >> >> @deprecated // In the future we could remove this interface >> interface BoundedOneInput extends BoundedInput {} >> >> interface BoundedMultiInput extends BoundedInput { >> void endInput(int i); >> >> default void endInput() {} // For compatibility >> } >> >> If operator/UDF does not care about the end of a single input, then it >> could directly implement the BoundedInput interface. The possible >> benefit to me is that we might be able to keep only one concept for >> marking the end of stream, especially for the operators with only >> one input. >> >> Very thanks for all the deep insights and discussions! >> >> Best, >> Yun >> >> ------------------------------------------------------------------ >> From:Dawid Wysakowicz <dwysakow...@apache.org> >> Send Time:2021 Jun. 3 (Thu.) 21:21 >> To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org>; Yun >> Gao <yungao...@aliyun.com> >> Cc:Piotr Nowojski <pnowoj...@apache.org>; Guowei Ma <guowei....@gmail.com>; >> Stephan Ewen <se...@apache.org> >> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished >> >> Hi all, >> >> Thanks for the very insightful discussion. I'd like to revive the effort >> of FLIP-147. First of all, from my side I'd like to say that I am really >> interested in helping that happen in the upcoming 1.14 release. >> >> I agree with Till that the final checkpoints and global commits are >> mostly orthogonal. Similarly as Till, I'd suggest to first focus on the >> final checkpoints, while just keeping in mind we should not make >> assumptions that would make it impossible to implement the global commits. >> So far I do not see such risk from the discussion. >> >> Going back to the final checkpoints issue. I think the only outstanding >> issue is which methods we want to use for flushing/closing both operators >> and UDFs just before performing the final checkpoint. As pointed out to me >> by Piotr, I am mentioning UDFs here as well, because we need a way for >> users using the Public API to benefit from the final checkpoint (bear in >> mind that e.g. TwoPhaseCommitSinkFunction which is implemented by our Kafka >> sink operates on the UDF level). Right now RichFunction has no method which >> could be called just before the final checkpoint that would say "flush" all >> intermediate state now and prepare for the final checkpoint. I'd suggest >> introducing an additional interface e.g. (name to be determined) >> >> interface Flushable<T> { >> >> void flush(Collector<T> out) >> >> } >> >> Additionally we would need to introduce a similar method on the >> StreamOperator level. Currently we have two methods that are called at the >> end of operator lifecycle: >> >> - >> - close >> - dispose >> >> The usage of the two methods is a bit confusing. Dispose is responsible >> for closing all open resources and is supposed to be called in case of a >> failure. On the other hand the close is a combination of a non-existent >> "flush" method we need and dispose for closing resources in case of a >> successful run. I'd suggest to clear it a bit. We would introduce a proper >> "flush" method which would be called in case of a successful finishing of >> an operator. Moreover we would make "close" deal only with closing any open >> resources, basically taking over the role of the dispose, which we would >> deprecate. >> >> Lastly, I'd like to say why I think it is better introduce a new "flush" >> method instead of using the "endInput" method of BoundedOne/MultiInput. >> That method is called per input, which means each operator would need to >> keep track of which inputs were already closed internally and react >> differently if all of the inputs were closed. With an explicit "flush" >> method we do not have such a problem as the input bookkeeping happens on >> the StreamTask level. >> >> Let me know what you think. I'd sync with Yun Gao and if there are no >> objections we will extend the FLIP page with necessary changes. >> >> Best, >> >> Dawid >> >> >>