Hi Dawid,

I see your point. I'd probably add drain only to Rich*Function where we
have the type bounds. Then we still need your Flushable<T> interface in
Rich*Function<..., T> to call it efficiently but we at least avoid weird
type combinations. I'll have a rethink later.

The proper solution is probably to add <OUT> to RichFunction and use Void
for RichSinkFunction but I'll have to understand the implications first.

On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hey,
>
> @Arvid The problem with adding the "drain/flush/stopProcessing" method to
> RichFunction is that it is not typed with the output type. At the same time
> we would most likely need a way to emit records from the method. That's
> originally thought about adding a typed interface which honestly I don't
> like that much either.
>
> On the UDF level we do not need to deprecate anything as you said. The
> close there already works as dispose on the Operator level. What we are
> suggesting is to unify that on the Operator level and deprecate the dispose
> there. @Yun I think we can already do that. We can either try to handle
> exceptions from the close in the case of a failure or just break it as it
> is a low-level, mostly internal API as Arvid said and also the migration
> would be really easy there.
>
> @Till @Arvid I am open for suggestions about the naming. I like the
> "drain" method.
>
> For now I'd go with @Piotr's proposal to add the "drain" method only to
> the SinkFunction. We think they are not immediately necessary for any of
> the other UDFs.
>
> Best,
>
> Dawid
> On 09/06/2021 11:20, Arvid Heise wrote:
>
> I have not followed the complete discussion and can't comment on the
> concepts. However, I have some ideas on the API changes:
>
> 1. If it's about adding additional life-cycle methods to UDFs, we should
> add the flush/endOfInput to RichFunction as this is the current way to
> define it. At this point, I don't see the need to add/change anything for
> UDFs. Since RichFunction does not have a dispose, do we even need to
> deprecate anything on UDF level? This would avoid having a new interface
> Flushable altogether (of which I'm not a big fan, see Piot's mail)
>
> 2. Further, I'd like to propose drain instead of flush as it would more
> align with the current nomenclature and makes the intent more obvious.
> However, that only works if there is no clash, so please double-check.
>
> 3. About changing methods on Operators: I'd say go ahead. It's
> experimental and not too hard to adjust on the user side. I also like the
> idea of beefing up ProcessFunction as a full replacement to custom
> operators but I'd keep that effort separate.
>
> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Thanks for the lively discussion everyone. I have to admit that I am not
>> really convinced that we should call the interface Flushable and the
>> method
>> flush. The problem is that this method should in the first place tell the
>> operator that it should stop processing and flush all buffered data. The
>> method "flush" alone does not convey this contract very well. Maybe a more
>> explicit name like stopProcessingAndFlush (maybe only stopProcessing)
>> would
>> be better. Moreover, from the OutputStream.flush method, I would expect
>> that I can call this method multiple times w/o changing the state of the
>> stream. This is not the case here.
>>
>> Given that the stop processing and flush all results is such an essential
>> lifecycle method of an operator/UDF, I am not sure whether we should offer
>> it as an optional interface users can implement. The problem I see is that
>> users are not aware of it when writing their own operators/UDFs. Making it
>> part of the actual interfaces makes it more explicit and easier to
>> discover. Maybe there is a way of adding it together with a default
>> implementation, deprecating the other methods, and then eventually
>> removing
>> the old methods. The last step will break APIs, though :-(
>>
>> Cheers,
>> Till
>>
>> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>> > Hi,
>> >
>> > Thanks for resuming this discussion. I think +1 for the proposal of
>> > dropping (deprecating) `dispose()`, and adding `flush()` to the
>> > `StreamOperator`/udfs. Semantically it would be more like new `close()`
>> is
>> > an equivalent of old `dispose()`. Old `close()` is an equivalent of new
>> > `flush() + close()`. I think it provides a relatively painless migration
>> > path (could we write down this migration?).
>> >
>> > However I have some doubts about the Flushable<T> interface. First of
>> all,
>> > it wouldn't work for sinks - sinks have no output. Secondly, I don't
>> like
>> > that it opens a possibility for problems like this (incompatible output
>> > types):
>> > ```
>> > public class MyMap implements MapFunction<String, Long>,
>> Flushable<Double>
>> > { ...}
>> > ```
>> >
>> > Also after a quick offline discussion with Dawid, I'm not sure anymore
>> to
>> > which UDFs it actually makes sense to add `flush`, as most of them
>> > shouldn't buffer any data. Apart from Sinks, it's usually an operator
>> that
>> > is buffering the data (that holds true for AsyncFunction,
>> ReduceFunction,
>> > AggregateFunction, MapFunction, FilterFunction, ...). For those
>> functions
>> > it's difficult to buffer any data, as they have no means to control
>> when to
>> > emit this data. One notable exception might be (Co)ProcessFunction, as
>> it
>> > can register timers on it's own. In that case I would propose to do the
>> > following thing:
>> > 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface
>> already
>> > has flush capabilities)
>> > 2. Maybe add `flush(Collector<O>)` to `(Co)ProcessFunction`, but maybe
>> we
>> > can postpone it
>> > 3. Leave other functions alone.
>> >
>> > After all, we could add `flush()` to other functions in the future if we
>> > really find a good motivating example to do so.
>> >
>> > About 2. Dawid is pitching an idea to convert `ProcessFunction` into a
>> > proper `Public` API that would replace StreamOperator. We could change
>> > `StreamOperator` to be purely `@Internal` class/interface, and add the
>> > missing functionality to the `ProcessFunction` (InputSelectable,
>> > BoundedInput, MailboxExecutor). With this, adding `flush()` to
>> > `ProcessFunction` would make a lot of sense. But maybe that should be a
>> > story for another day?
>> >
>> > Best,
>> > Piotrek
>> >
>> > pt., 4 cze 2021 o 10:36 Yun Gao <yungao...@aliyun.com> napisaƂ(a):
>> >
>> >> Hi all,
>> >>
>> >> Very thanks @Dawid for resuming the discussion and very thanks @Till
>> for
>> >> the summary ! (and very sorry for I missed the mail and do not response
>> >> in time...)
>> >>
>> >> I also agree with that we could consider the global commits latter
>> >> separately after we have addressed the final checkpoints, and also
>> other
>> >> points as Till summarized.
>> >> Currently the only case that have used the cascade commit is the Table
>> >> FileSystem and Hive connectors. I checked the code and found currently
>> they
>> >> will commit the
>> >> last piece of data directly  in endOfInput(). Although this might emit
>> >> repeat records if there are failover during job finishing, it avoids
>> >> emitting the records in the
>> >> notifyCheckpointComplete() after endOfInput(), thus the modification to
>> >> the operator lifecycle in final checkpoints would cause compatibility
>> >> problem for these connectors,
>> >> thus we do not need to modify them at the first place.
>> >>
>> >> 2. Regarding the operator lifecycle, I also agree with the proposed
>> >> changes. To sum up, I think the operator lifecycle would become
>> >>
>> >> endOfInput(1)
>> >> ...
>> >> endOfInput(n)
>> >> flush() --> call UDF's flush method
>> >> if some operator requires final checkpoints
>> >>     snapshotState()
>> >>     notifyCheckpointComplete()
>> >> end if
>> >> close() --> call UDF's close method
>> >>
>> >> Since currently the close() is only called in normal finish and
>> dispose()
>> >> will be called in both failover and normal case, for compatibility, I
>> think
>> >> we may
>> >> have to postpone the change to a single close() method to version 2.0 ?
>> >>
>> >> 3. Regarding the name and position of flush() method, I also agree with
>> >> that we will need a separate method to mark the termination of the
>> whole
>> >> stream for
>> >> multiple-input streams. Would it be also ok if we have some
>> modification
>> >> to the current BoundedXXInput interfaces to
>> >>
>> >> interface BoundedInput {
>> >>     void endInput() // marks the end of the whole streams, as flush()
>> >> does.
>> >> }
>> >>
>> >> @deprecated // In the future we could remove this interface
>> >> interface BoundedOneInput extends BoundedInput {}
>> >>
>> >> interface BoundedMultiInput extends BoundedInput {
>> >>       void endInput(int i);
>> >>
>> >>       default void endInput() {} // For compatibility
>> >> }
>> >>
>> >> If operator/UDF does not care about the end of a single input, then it
>> >> could directly implement the BoundedInput interface. The possible
>> >> benefit to me is that we might be able to keep only one concept for
>> >> marking the end of stream, especially for the operators with only
>> >> one input.
>> >>
>> >> Very thanks for all the deep insights and discussions!
>> >>
>> >> Best,
>> >> Yun
>> >>
>> >> ------------------------------------------------------------------
>> >> From:Dawid Wysakowicz <dwysakow...@apache.org>
>> >> Send Time:2021 Jun. 3 (Thu.) 21:21
>> >> To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org>;
>> Yun
>> >> Gao <yungao...@aliyun.com>
>> >> Cc:Piotr Nowojski <pnowoj...@apache.org>; Guowei Ma <
>> guowei....@gmail.com>;
>> >> Stephan Ewen <se...@apache.org>
>> >> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
>> Finished
>> >>
>> >> Hi all,
>> >>
>> >> Thanks for the very insightful discussion. I'd like to revive the
>> effort
>> >> of FLIP-147. First of all, from my side I'd like to say that I am
>> really
>> >> interested in helping that happen in the upcoming 1.14 release.
>> >>
>> >> I agree with Till that the final checkpoints and global commits are
>> >> mostly orthogonal. Similarly as Till, I'd suggest to first focus on the
>> >> final checkpoints, while just keeping in mind we should not make
>> >> assumptions that would make it impossible to implement the global
>> commits.
>> >> So far I do not see such risk from the discussion.
>> >>
>> >> Going back to the final checkpoints issue. I think the only outstanding
>> >> issue is which methods we want to use for flushing/closing both
>> operators
>> >> and UDFs just before performing the final checkpoint. As pointed out
>> to me
>> >> by Piotr, I am mentioning UDFs here as well, because we need a way for
>> >> users using the Public API to benefit from the final checkpoint (bear
>> in
>> >> mind that e.g. TwoPhaseCommitSinkFunction which is implemented by our
>> Kafka
>> >> sink operates on the UDF level). Right now RichFunction has no method
>> which
>> >> could be called just before the final checkpoint that would say
>> "flush" all
>> >> intermediate state now and prepare for the final checkpoint. I'd
>> suggest
>> >> introducing an additional interface e.g. (name to be determined)
>> >>
>> >> interface Flushable<T> {
>> >>
>> >>    void flush(Collector<T> out)
>> >>
>> >> }
>> >>
>> >> Additionally we would need to introduce a similar method on the
>> >> StreamOperator level. Currently we have two methods that are called at
>> the
>> >> end of operator lifecycle:
>> >>
>> >>    -
>> >>    - close
>> >>    - dispose
>> >>
>> >> The usage of the two methods is a bit confusing. Dispose is responsible
>> >> for closing all open resources and is supposed to be called in case of
>> a
>> >> failure. On the other hand the close is a combination of a non-existent
>> >> "flush" method we need and dispose for closing resources in case of a
>> >> successful run. I'd suggest to clear it a bit. We would introduce a
>> proper
>> >> "flush" method which would be called in case of a successful finishing
>> of
>> >> an operator. Moreover we would make "close" deal only with closing any
>> open
>> >> resources, basically taking over the role of the dispose, which we
>> would
>> >> deprecate.
>> >>
>> >> Lastly, I'd like to say why I think it is better introduce a new
>> "flush"
>> >> method instead of using the "endInput" method of BoundedOne/MultiInput.
>> >> That method is called per input, which means each operator would need
>> to
>> >> keep track of which inputs were already closed internally and react
>> >> differently if all of the inputs were closed. With an explicit "flush"
>> >> method we do not have such a problem as the input bookkeeping happens
>> on
>> >> the StreamTask level.
>> >>
>> >> Let me know what you think. I'd sync with Yun Gao and if there are no
>> >> objections we will extend the FLIP page with necessary changes.
>> >>
>> >> Best,
>> >>
>> >> Dawid
>> >>
>> >>
>> >>
>>
>

Reply via email to