Hi Piot,

I'm fine with just doing it on the Sink. My responses were focused on the
API (the how) not on the concept (the if). Just keep the methods on the
different places in sync, such that it is easy to introduce a common
interface later.

Re name: drain is not a reinvention as it's used quite often throughout
Flink (e.g., Mailbox, stopWithSavepoint with drain flag). flush has no link
to life-cycles: you usually do it arbitrarily often.
I like `finish` very much as it relates to JobStatus FINISHED, has a clear
tie to life-cycles, and is crisp.
I also thought about `terminate` but I'd clearly favor `finish` as the
verbs cannot be exchanged in the following: the task may terminate its
operators but the operators should finish their thing first.

On Wed, Jun 9, 2021 at 6:48 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi,
>
> Arvid: What's the problem with providing `void flush()`/`void drain()` only
> in the `SinkFunction`? It would avoid the problem of typing. Why would one
> need to have it in the other `Rich***Function`s? For `flush()` to make
> sense, the entity which has this method, would need to buffer some data. I
> don't see this to be reasonable in anything but `SinkFunction`,
> `ProcessFunction` and operators.
>
> Re `flush()` vs `drain()`. Frankly, I wouldn't understand what `drain()` is
> all about without reading the java-doc, and afterwards, I would have an
> impression that someone wanted to reinvent a wheel :) `flush()` is kind of
> an industry standard for things like that. Furthermore I don't think
> `drain()` solves Till's concern (drain != stop + flush). `stopAndFlush()`
> would be better in this regard, but it also doesn't feel right. Maybe
> `finish()`?
>
> Piotrek
>
> śr., 9 cze 2021 o 11:51 Arvid Heise <ar...@apache.org> napisał(a):
>
> > Hi Dawid,
> >
> > I see your point. I'd probably add drain only to Rich*Function where we
> > have the type bounds. Then we still need your Flushable<T> interface in
> > Rich*Function<..., T> to call it efficiently but we at least avoid weird
> > type combinations. I'll have a rethink later.
> >
> > The proper solution is probably to add <OUT> to RichFunction and use Void
> > for RichSinkFunction but I'll have to understand the implications first.
> >
> > On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz <dwysakow...@apache.org
> >
> > wrote:
> >
> >> Hey,
> >>
> >> @Arvid The problem with adding the "drain/flush/stopProcessing" method
> to
> >> RichFunction is that it is not typed with the output type. At the same
> time
> >> we would most likely need a way to emit records from the method. That's
> >> originally thought about adding a typed interface which honestly I don't
> >> like that much either.
> >>
> >> On the UDF level we do not need to deprecate anything as you said. The
> >> close there already works as dispose on the Operator level. What we are
> >> suggesting is to unify that on the Operator level and deprecate the
> dispose
> >> there. @Yun I think we can already do that. We can either try to handle
> >> exceptions from the close in the case of a failure or just break it as
> it
> >> is a low-level, mostly internal API as Arvid said and also the migration
> >> would be really easy there.
> >>
> >> @Till @Arvid I am open for suggestions about the naming. I like the
> >> "drain" method.
> >>
> >> For now I'd go with @Piotr's proposal to add the "drain" method only to
> >> the SinkFunction. We think they are not immediately necessary for any of
> >> the other UDFs.
> >>
> >> Best,
> >>
> >> Dawid
> >> On 09/06/2021 11:20, Arvid Heise wrote:
> >>
> >> I have not followed the complete discussion and can't comment on the
> >> concepts. However, I have some ideas on the API changes:
> >>
> >> 1. If it's about adding additional life-cycle methods to UDFs, we should
> >> add the flush/endOfInput to RichFunction as this is the current way to
> >> define it. At this point, I don't see the need to add/change anything
> for
> >> UDFs. Since RichFunction does not have a dispose, do we even need to
> >> deprecate anything on UDF level? This would avoid having a new interface
> >> Flushable altogether (of which I'm not a big fan, see Piot's mail)
> >>
> >> 2. Further, I'd like to propose drain instead of flush as it would more
> >> align with the current nomenclature and makes the intent more obvious.
> >> However, that only works if there is no clash, so please double-check.
> >>
> >> 3. About changing methods on Operators: I'd say go ahead. It's
> >> experimental and not too hard to adjust on the user side. I also like
> the
> >> idea of beefing up ProcessFunction as a full replacement to custom
> >> operators but I'd keep that effort separate.
> >>
> >> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann <trohrm...@apache.org>
> >> wrote:
> >>
> >>> Thanks for the lively discussion everyone. I have to admit that I am
> not
> >>> really convinced that we should call the interface Flushable and the
> >>> method
> >>> flush. The problem is that this method should in the first place tell
> the
> >>> operator that it should stop processing and flush all buffered data.
> The
> >>> method "flush" alone does not convey this contract very well. Maybe a
> >>> more
> >>> explicit name like stopProcessingAndFlush (maybe only stopProcessing)
> >>> would
> >>> be better. Moreover, from the OutputStream.flush method, I would expect
> >>> that I can call this method multiple times w/o changing the state of
> the
> >>> stream. This is not the case here.
> >>>
> >>> Given that the stop processing and flush all results is such an
> essential
> >>> lifecycle method of an operator/UDF, I am not sure whether we should
> >>> offer
> >>> it as an optional interface users can implement. The problem I see is
> >>> that
> >>> users are not aware of it when writing their own operators/UDFs. Making
> >>> it
> >>> part of the actual interfaces makes it more explicit and easier to
> >>> discover. Maybe there is a way of adding it together with a default
> >>> implementation, deprecating the other methods, and then eventually
> >>> removing
> >>> the old methods. The last step will break APIs, though :-(
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski <pnowoj...@apache.org>
> >>> wrote:
> >>>
> >>> > Hi,
> >>> >
> >>> > Thanks for resuming this discussion. I think +1 for the proposal of
> >>> > dropping (deprecating) `dispose()`, and adding `flush()` to the
> >>> > `StreamOperator`/udfs. Semantically it would be more like new
> >>> `close()` is
> >>> > an equivalent of old `dispose()`. Old `close()` is an equivalent of
> new
> >>> > `flush() + close()`. I think it provides a relatively painless
> >>> migration
> >>> > path (could we write down this migration?).
> >>> >
> >>> > However I have some doubts about the Flushable<T> interface. First of
> >>> all,
> >>> > it wouldn't work for sinks - sinks have no output. Secondly, I don't
> >>> like
> >>> > that it opens a possibility for problems like this (incompatible
> output
> >>> > types):
> >>> > ```
> >>> > public class MyMap implements MapFunction<String, Long>,
> >>> Flushable<Double>
> >>> > { ...}
> >>> > ```
> >>> >
> >>> > Also after a quick offline discussion with Dawid, I'm not sure
> anymore
> >>> to
> >>> > which UDFs it actually makes sense to add `flush`, as most of them
> >>> > shouldn't buffer any data. Apart from Sinks, it's usually an operator
> >>> that
> >>> > is buffering the data (that holds true for AsyncFunction,
> >>> ReduceFunction,
> >>> > AggregateFunction, MapFunction, FilterFunction, ...). For those
> >>> functions
> >>> > it's difficult to buffer any data, as they have no means to control
> >>> when to
> >>> > emit this data. One notable exception might be (Co)ProcessFunction,
> as
> >>> it
> >>> > can register timers on it's own. In that case I would propose to do
> the
> >>> > following thing:
> >>> > 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface
> >>> already
> >>> > has flush capabilities)
> >>> > 2. Maybe add `flush(Collector<O>)` to `(Co)ProcessFunction`, but
> maybe
> >>> we
> >>> > can postpone it
> >>> > 3. Leave other functions alone.
> >>> >
> >>> > After all, we could add `flush()` to other functions in the future if
> >>> we
> >>> > really find a good motivating example to do so.
> >>> >
> >>> > About 2. Dawid is pitching an idea to convert `ProcessFunction` into
> a
> >>> > proper `Public` API that would replace StreamOperator. We could
> change
> >>> > `StreamOperator` to be purely `@Internal` class/interface, and add
> the
> >>> > missing functionality to the `ProcessFunction` (InputSelectable,
> >>> > BoundedInput, MailboxExecutor). With this, adding `flush()` to
> >>> > `ProcessFunction` would make a lot of sense. But maybe that should
> be a
> >>> > story for another day?
> >>> >
> >>> > Best,
> >>> > Piotrek
> >>> >
> >>> > pt., 4 cze 2021 o 10:36 Yun Gao <yungao...@aliyun.com> napisał(a):
> >>> >
> >>> >> Hi all,
> >>> >>
> >>> >> Very thanks @Dawid for resuming the discussion and very thanks @Till
> >>> for
> >>> >> the summary ! (and very sorry for I missed the mail and do not
> >>> response
> >>> >> in time...)
> >>> >>
> >>> >> I also agree with that we could consider the global commits latter
> >>> >> separately after we have addressed the final checkpoints, and also
> >>> other
> >>> >> points as Till summarized.
> >>> >> Currently the only case that have used the cascade commit is the
> Table
> >>> >> FileSystem and Hive connectors. I checked the code and found
> >>> currently they
> >>> >> will commit the
> >>> >> last piece of data directly  in endOfInput(). Although this might
> emit
> >>> >> repeat records if there are failover during job finishing, it avoids
> >>> >> emitting the records in the
> >>> >> notifyCheckpointComplete() after endOfInput(), thus the modification
> >>> to
> >>> >> the operator lifecycle in final checkpoints would cause
> compatibility
> >>> >> problem for these connectors,
> >>> >> thus we do not need to modify them at the first place.
> >>> >>
> >>> >> 2. Regarding the operator lifecycle, I also agree with the proposed
> >>> >> changes. To sum up, I think the operator lifecycle would become
> >>> >>
> >>> >> endOfInput(1)
> >>> >> ...
> >>> >> endOfInput(n)
> >>> >> flush() --> call UDF's flush method
> >>> >> if some operator requires final checkpoints
> >>> >>     snapshotState()
> >>> >>     notifyCheckpointComplete()
> >>> >> end if
> >>> >> close() --> call UDF's close method
> >>> >>
> >>> >> Since currently the close() is only called in normal finish and
> >>> dispose()
> >>> >> will be called in both failover and normal case, for compatibility,
> I
> >>> think
> >>> >> we may
> >>> >> have to postpone the change to a single close() method to version
> 2.0
> >>> ?
> >>> >>
> >>> >> 3. Regarding the name and position of flush() method, I also agree
> >>> with
> >>> >> that we will need a separate method to mark the termination of the
> >>> whole
> >>> >> stream for
> >>> >> multiple-input streams. Would it be also ok if we have some
> >>> modification
> >>> >> to the current BoundedXXInput interfaces to
> >>> >>
> >>> >> interface BoundedInput {
> >>> >>     void endInput() // marks the end of the whole streams, as
> flush()
> >>> >> does.
> >>> >> }
> >>> >>
> >>> >> @deprecated // In the future we could remove this interface
> >>> >> interface BoundedOneInput extends BoundedInput {}
> >>> >>
> >>> >> interface BoundedMultiInput extends BoundedInput {
> >>> >>       void endInput(int i);
> >>> >>
> >>> >>       default void endInput() {} // For compatibility
> >>> >> }
> >>> >>
> >>> >> If operator/UDF does not care about the end of a single input, then
> it
> >>> >> could directly implement the BoundedInput interface. The possible
> >>> >> benefit to me is that we might be able to keep only one concept for
> >>> >> marking the end of stream, especially for the operators with only
> >>> >> one input.
> >>> >>
> >>> >> Very thanks for all the deep insights and discussions!
> >>> >>
> >>> >> Best,
> >>> >> Yun
> >>> >>
> >>> >> ------------------------------------------------------------------
> >>> >> From:Dawid Wysakowicz <dwysakow...@apache.org>
> >>> >> Send Time:2021 Jun. 3 (Thu.) 21:21
> >>> >> To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org
> >;
> >>> Yun
> >>> >> Gao <yungao...@aliyun.com>
> >>> >> Cc:Piotr Nowojski <pnowoj...@apache.org>; Guowei Ma <
> >>> guowei....@gmail.com>;
> >>> >> Stephan Ewen <se...@apache.org>
> >>> >> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> >>> Finished
> >>> >>
> >>> >> Hi all,
> >>> >>
> >>> >> Thanks for the very insightful discussion. I'd like to revive the
> >>> effort
> >>> >> of FLIP-147. First of all, from my side I'd like to say that I am
> >>> really
> >>> >> interested in helping that happen in the upcoming 1.14 release.
> >>> >>
> >>> >> I agree with Till that the final checkpoints and global commits are
> >>> >> mostly orthogonal. Similarly as Till, I'd suggest to first focus on
> >>> the
> >>> >> final checkpoints, while just keeping in mind we should not make
> >>> >> assumptions that would make it impossible to implement the global
> >>> commits.
> >>> >> So far I do not see such risk from the discussion.
> >>> >>
> >>> >> Going back to the final checkpoints issue. I think the only
> >>> outstanding
> >>> >> issue is which methods we want to use for flushing/closing both
> >>> operators
> >>> >> and UDFs just before performing the final checkpoint. As pointed out
> >>> to me
> >>> >> by Piotr, I am mentioning UDFs here as well, because we need a way
> for
> >>> >> users using the Public API to benefit from the final checkpoint
> (bear
> >>> in
> >>> >> mind that e.g. TwoPhaseCommitSinkFunction which is implemented by
> our
> >>> Kafka
> >>> >> sink operates on the UDF level). Right now RichFunction has no
> method
> >>> which
> >>> >> could be called just before the final checkpoint that would say
> >>> "flush" all
> >>> >> intermediate state now and prepare for the final checkpoint. I'd
> >>> suggest
> >>> >> introducing an additional interface e.g. (name to be determined)
> >>> >>
> >>> >> interface Flushable<T> {
> >>> >>
> >>> >>    void flush(Collector<T> out)
> >>> >>
> >>> >> }
> >>> >>
> >>> >> Additionally we would need to introduce a similar method on the
> >>> >> StreamOperator level. Currently we have two methods that are called
> >>> at the
> >>> >> end of operator lifecycle:
> >>> >>
> >>> >>    -
> >>> >>    - close
> >>> >>    - dispose
> >>> >>
> >>> >> The usage of the two methods is a bit confusing. Dispose is
> >>> responsible
> >>> >> for closing all open resources and is supposed to be called in case
> >>> of a
> >>> >> failure. On the other hand the close is a combination of a
> >>> non-existent
> >>> >> "flush" method we need and dispose for closing resources in case of
> a
> >>> >> successful run. I'd suggest to clear it a bit. We would introduce a
> >>> proper
> >>> >> "flush" method which would be called in case of a successful
> >>> finishing of
> >>> >> an operator. Moreover we would make "close" deal only with closing
> >>> any open
> >>> >> resources, basically taking over the role of the dispose, which we
> >>> would
> >>> >> deprecate.
> >>> >>
> >>> >> Lastly, I'd like to say why I think it is better introduce a new
> >>> "flush"
> >>> >> method instead of using the "endInput" method of
> >>> BoundedOne/MultiInput.
> >>> >> That method is called per input, which means each operator would
> need
> >>> to
> >>> >> keep track of which inputs were already closed internally and react
> >>> >> differently if all of the inputs were closed. With an explicit
> "flush"
> >>> >> method we do not have such a problem as the input bookkeeping
> happens
> >>> on
> >>> >> the StreamTask level.
> >>> >>
> >>> >> Let me know what you think. I'd sync with Yun Gao and if there are
> no
> >>> >> objections we will extend the FLIP page with necessary changes.
> >>> >>
> >>> >> Best,
> >>> >>
> >>> >> Dawid
> >>> >>
> >>> >>
> >>> >>
> >>>
> >>
>

Reply via email to