Hi,

Arvid: What's the problem with providing `void flush()`/`void drain()` only
in the `SinkFunction`? It would avoid the problem of typing. Why would one
need to have it in the other `Rich***Function`s? For `flush()` to make
sense, the entity which has this method, would need to buffer some data. I
don't see this to be reasonable in anything but `SinkFunction`,
`ProcessFunction` and operators.

Re `flush()` vs `drain()`. Frankly, I wouldn't understand what `drain()` is
all about without reading the java-doc, and afterwards, I would have an
impression that someone wanted to reinvent a wheel :) `flush()` is kind of
an industry standard for things like that. Furthermore I don't think
`drain()` solves Till's concern (drain != stop + flush). `stopAndFlush()`
would be better in this regard, but it also doesn't feel right. Maybe
`finish()`?

Piotrek

śr., 9 cze 2021 o 11:51 Arvid Heise <ar...@apache.org> napisał(a):

> Hi Dawid,
>
> I see your point. I'd probably add drain only to Rich*Function where we
> have the type bounds. Then we still need your Flushable<T> interface in
> Rich*Function<..., T> to call it efficiently but we at least avoid weird
> type combinations. I'll have a rethink later.
>
> The proper solution is probably to add <OUT> to RichFunction and use Void
> for RichSinkFunction but I'll have to understand the implications first.
>
> On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hey,
>>
>> @Arvid The problem with adding the "drain/flush/stopProcessing" method to
>> RichFunction is that it is not typed with the output type. At the same time
>> we would most likely need a way to emit records from the method. That's
>> originally thought about adding a typed interface which honestly I don't
>> like that much either.
>>
>> On the UDF level we do not need to deprecate anything as you said. The
>> close there already works as dispose on the Operator level. What we are
>> suggesting is to unify that on the Operator level and deprecate the dispose
>> there. @Yun I think we can already do that. We can either try to handle
>> exceptions from the close in the case of a failure or just break it as it
>> is a low-level, mostly internal API as Arvid said and also the migration
>> would be really easy there.
>>
>> @Till @Arvid I am open for suggestions about the naming. I like the
>> "drain" method.
>>
>> For now I'd go with @Piotr's proposal to add the "drain" method only to
>> the SinkFunction. We think they are not immediately necessary for any of
>> the other UDFs.
>>
>> Best,
>>
>> Dawid
>> On 09/06/2021 11:20, Arvid Heise wrote:
>>
>> I have not followed the complete discussion and can't comment on the
>> concepts. However, I have some ideas on the API changes:
>>
>> 1. If it's about adding additional life-cycle methods to UDFs, we should
>> add the flush/endOfInput to RichFunction as this is the current way to
>> define it. At this point, I don't see the need to add/change anything for
>> UDFs. Since RichFunction does not have a dispose, do we even need to
>> deprecate anything on UDF level? This would avoid having a new interface
>> Flushable altogether (of which I'm not a big fan, see Piot's mail)
>>
>> 2. Further, I'd like to propose drain instead of flush as it would more
>> align with the current nomenclature and makes the intent more obvious.
>> However, that only works if there is no clash, so please double-check.
>>
>> 3. About changing methods on Operators: I'd say go ahead. It's
>> experimental and not too hard to adjust on the user side. I also like the
>> idea of beefing up ProcessFunction as a full replacement to custom
>> operators but I'd keep that effort separate.
>>
>> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Thanks for the lively discussion everyone. I have to admit that I am not
>>> really convinced that we should call the interface Flushable and the
>>> method
>>> flush. The problem is that this method should in the first place tell the
>>> operator that it should stop processing and flush all buffered data. The
>>> method "flush" alone does not convey this contract very well. Maybe a
>>> more
>>> explicit name like stopProcessingAndFlush (maybe only stopProcessing)
>>> would
>>> be better. Moreover, from the OutputStream.flush method, I would expect
>>> that I can call this method multiple times w/o changing the state of the
>>> stream. This is not the case here.
>>>
>>> Given that the stop processing and flush all results is such an essential
>>> lifecycle method of an operator/UDF, I am not sure whether we should
>>> offer
>>> it as an optional interface users can implement. The problem I see is
>>> that
>>> users are not aware of it when writing their own operators/UDFs. Making
>>> it
>>> part of the actual interfaces makes it more explicit and easier to
>>> discover. Maybe there is a way of adding it together with a default
>>> implementation, deprecating the other methods, and then eventually
>>> removing
>>> the old methods. The last step will break APIs, though :-(
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>> > Hi,
>>> >
>>> > Thanks for resuming this discussion. I think +1 for the proposal of
>>> > dropping (deprecating) `dispose()`, and adding `flush()` to the
>>> > `StreamOperator`/udfs. Semantically it would be more like new
>>> `close()` is
>>> > an equivalent of old `dispose()`. Old `close()` is an equivalent of new
>>> > `flush() + close()`. I think it provides a relatively painless
>>> migration
>>> > path (could we write down this migration?).
>>> >
>>> > However I have some doubts about the Flushable<T> interface. First of
>>> all,
>>> > it wouldn't work for sinks - sinks have no output. Secondly, I don't
>>> like
>>> > that it opens a possibility for problems like this (incompatible output
>>> > types):
>>> > ```
>>> > public class MyMap implements MapFunction<String, Long>,
>>> Flushable<Double>
>>> > { ...}
>>> > ```
>>> >
>>> > Also after a quick offline discussion with Dawid, I'm not sure anymore
>>> to
>>> > which UDFs it actually makes sense to add `flush`, as most of them
>>> > shouldn't buffer any data. Apart from Sinks, it's usually an operator
>>> that
>>> > is buffering the data (that holds true for AsyncFunction,
>>> ReduceFunction,
>>> > AggregateFunction, MapFunction, FilterFunction, ...). For those
>>> functions
>>> > it's difficult to buffer any data, as they have no means to control
>>> when to
>>> > emit this data. One notable exception might be (Co)ProcessFunction, as
>>> it
>>> > can register timers on it's own. In that case I would propose to do the
>>> > following thing:
>>> > 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface
>>> already
>>> > has flush capabilities)
>>> > 2. Maybe add `flush(Collector<O>)` to `(Co)ProcessFunction`, but maybe
>>> we
>>> > can postpone it
>>> > 3. Leave other functions alone.
>>> >
>>> > After all, we could add `flush()` to other functions in the future if
>>> we
>>> > really find a good motivating example to do so.
>>> >
>>> > About 2. Dawid is pitching an idea to convert `ProcessFunction` into a
>>> > proper `Public` API that would replace StreamOperator. We could change
>>> > `StreamOperator` to be purely `@Internal` class/interface, and add the
>>> > missing functionality to the `ProcessFunction` (InputSelectable,
>>> > BoundedInput, MailboxExecutor). With this, adding `flush()` to
>>> > `ProcessFunction` would make a lot of sense. But maybe that should be a
>>> > story for another day?
>>> >
>>> > Best,
>>> > Piotrek
>>> >
>>> > pt., 4 cze 2021 o 10:36 Yun Gao <yungao...@aliyun.com> napisał(a):
>>> >
>>> >> Hi all,
>>> >>
>>> >> Very thanks @Dawid for resuming the discussion and very thanks @Till
>>> for
>>> >> the summary ! (and very sorry for I missed the mail and do not
>>> response
>>> >> in time...)
>>> >>
>>> >> I also agree with that we could consider the global commits latter
>>> >> separately after we have addressed the final checkpoints, and also
>>> other
>>> >> points as Till summarized.
>>> >> Currently the only case that have used the cascade commit is the Table
>>> >> FileSystem and Hive connectors. I checked the code and found
>>> currently they
>>> >> will commit the
>>> >> last piece of data directly  in endOfInput(). Although this might emit
>>> >> repeat records if there are failover during job finishing, it avoids
>>> >> emitting the records in the
>>> >> notifyCheckpointComplete() after endOfInput(), thus the modification
>>> to
>>> >> the operator lifecycle in final checkpoints would cause compatibility
>>> >> problem for these connectors,
>>> >> thus we do not need to modify them at the first place.
>>> >>
>>> >> 2. Regarding the operator lifecycle, I also agree with the proposed
>>> >> changes. To sum up, I think the operator lifecycle would become
>>> >>
>>> >> endOfInput(1)
>>> >> ...
>>> >> endOfInput(n)
>>> >> flush() --> call UDF's flush method
>>> >> if some operator requires final checkpoints
>>> >>     snapshotState()
>>> >>     notifyCheckpointComplete()
>>> >> end if
>>> >> close() --> call UDF's close method
>>> >>
>>> >> Since currently the close() is only called in normal finish and
>>> dispose()
>>> >> will be called in both failover and normal case, for compatibility, I
>>> think
>>> >> we may
>>> >> have to postpone the change to a single close() method to version 2.0
>>> ?
>>> >>
>>> >> 3. Regarding the name and position of flush() method, I also agree
>>> with
>>> >> that we will need a separate method to mark the termination of the
>>> whole
>>> >> stream for
>>> >> multiple-input streams. Would it be also ok if we have some
>>> modification
>>> >> to the current BoundedXXInput interfaces to
>>> >>
>>> >> interface BoundedInput {
>>> >>     void endInput() // marks the end of the whole streams, as flush()
>>> >> does.
>>> >> }
>>> >>
>>> >> @deprecated // In the future we could remove this interface
>>> >> interface BoundedOneInput extends BoundedInput {}
>>> >>
>>> >> interface BoundedMultiInput extends BoundedInput {
>>> >>       void endInput(int i);
>>> >>
>>> >>       default void endInput() {} // For compatibility
>>> >> }
>>> >>
>>> >> If operator/UDF does not care about the end of a single input, then it
>>> >> could directly implement the BoundedInput interface. The possible
>>> >> benefit to me is that we might be able to keep only one concept for
>>> >> marking the end of stream, especially for the operators with only
>>> >> one input.
>>> >>
>>> >> Very thanks for all the deep insights and discussions!
>>> >>
>>> >> Best,
>>> >> Yun
>>> >>
>>> >> ------------------------------------------------------------------
>>> >> From:Dawid Wysakowicz <dwysakow...@apache.org>
>>> >> Send Time:2021 Jun. 3 (Thu.) 21:21
>>> >> To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org>;
>>> Yun
>>> >> Gao <yungao...@aliyun.com>
>>> >> Cc:Piotr Nowojski <pnowoj...@apache.org>; Guowei Ma <
>>> guowei....@gmail.com>;
>>> >> Stephan Ewen <se...@apache.org>
>>> >> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
>>> Finished
>>> >>
>>> >> Hi all,
>>> >>
>>> >> Thanks for the very insightful discussion. I'd like to revive the
>>> effort
>>> >> of FLIP-147. First of all, from my side I'd like to say that I am
>>> really
>>> >> interested in helping that happen in the upcoming 1.14 release.
>>> >>
>>> >> I agree with Till that the final checkpoints and global commits are
>>> >> mostly orthogonal. Similarly as Till, I'd suggest to first focus on
>>> the
>>> >> final checkpoints, while just keeping in mind we should not make
>>> >> assumptions that would make it impossible to implement the global
>>> commits.
>>> >> So far I do not see such risk from the discussion.
>>> >>
>>> >> Going back to the final checkpoints issue. I think the only
>>> outstanding
>>> >> issue is which methods we want to use for flushing/closing both
>>> operators
>>> >> and UDFs just before performing the final checkpoint. As pointed out
>>> to me
>>> >> by Piotr, I am mentioning UDFs here as well, because we need a way for
>>> >> users using the Public API to benefit from the final checkpoint (bear
>>> in
>>> >> mind that e.g. TwoPhaseCommitSinkFunction which is implemented by our
>>> Kafka
>>> >> sink operates on the UDF level). Right now RichFunction has no method
>>> which
>>> >> could be called just before the final checkpoint that would say
>>> "flush" all
>>> >> intermediate state now and prepare for the final checkpoint. I'd
>>> suggest
>>> >> introducing an additional interface e.g. (name to be determined)
>>> >>
>>> >> interface Flushable<T> {
>>> >>
>>> >>    void flush(Collector<T> out)
>>> >>
>>> >> }
>>> >>
>>> >> Additionally we would need to introduce a similar method on the
>>> >> StreamOperator level. Currently we have two methods that are called
>>> at the
>>> >> end of operator lifecycle:
>>> >>
>>> >>    -
>>> >>    - close
>>> >>    - dispose
>>> >>
>>> >> The usage of the two methods is a bit confusing. Dispose is
>>> responsible
>>> >> for closing all open resources and is supposed to be called in case
>>> of a
>>> >> failure. On the other hand the close is a combination of a
>>> non-existent
>>> >> "flush" method we need and dispose for closing resources in case of a
>>> >> successful run. I'd suggest to clear it a bit. We would introduce a
>>> proper
>>> >> "flush" method which would be called in case of a successful
>>> finishing of
>>> >> an operator. Moreover we would make "close" deal only with closing
>>> any open
>>> >> resources, basically taking over the role of the dispose, which we
>>> would
>>> >> deprecate.
>>> >>
>>> >> Lastly, I'd like to say why I think it is better introduce a new
>>> "flush"
>>> >> method instead of using the "endInput" method of
>>> BoundedOne/MultiInput.
>>> >> That method is called per input, which means each operator would need
>>> to
>>> >> keep track of which inputs were already closed internally and react
>>> >> differently if all of the inputs were closed. With an explicit "flush"
>>> >> method we do not have such a problem as the input bookkeeping happens
>>> on
>>> >> the StreamTask level.
>>> >>
>>> >> Let me know what you think. I'd sync with Yun Gao and if there are no
>>> >> objections we will extend the FLIP page with necessary changes.
>>> >>
>>> >> Best,
>>> >>
>>> >> Dawid
>>> >>
>>> >>
>>> >>
>>>
>>

Reply via email to