Thanks for the explanation. I like the fatory pattern to make the member
variables immutable and final.

So +1 to the proposal.

Best,
Jark

On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org> wrote:

> I am fine with that.
>
> Much of the principles seem agreed upon. I understand the need to support
> code-generated extractors and we should support most of it already (as
> Aljoscha mentioned via the factories) can extend this if needed.
>
> I think that the factory approach supports code-generated extractors in a
> cleaner way even than an extractor with an open/init method.
>
>
> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > We're slightly running out of time. I would propose we vote on the basic
> > principle and remain open to later additions. This feature is quite
> > important to make the new Kafka Source that is developed as part of
> > FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
> > the newly added connector.
> >
> > I know that's a bit unorthodox but would everyone be OK with what's
> > currently there and then we iterate?
> >
> > Best,
> > Aljoscha
> >
> > On 11.05.20 13:57, Aljoscha Krettek wrote:
> > > Ah, I meant to write this in my previous email, sorry about that.
> > >
> > > The WatermarkStrategy, which is basically a factory for a
> > > WatermarkGenerator is the replacement for the open() method. This is
> the
> > > same strategy that was followed for StreamOperatorFactory, which was
> > > introduced to allow code generation in the Table API [1]. If we need
> > > metrics or other things we would add that as a parameter to the factory
> > > method. What do you think?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-11974
> > >
> > > On 10.05.20 05:07, Jark Wu wrote:
> > >> Hi,
> > >>
> > >> Regarding to the `open()/close()`, I think it's necessary for
> > >> Table&SQL to
> > >> compile the generated code.
> > >> In Table&SQL, the watermark strategy and event-timestamp is defined
> > using
> > >> SQL expressions, we will
> > >> translate and generate Java code for the expressions. If we have
> > >> `open()/close()`, we don't need lazy initialization.
> > >> Besides that, I can see a need to report some metrics, e.g. the
> current
> > >> watermark, the dirty timestamps (null value), etc.
> > >> So I think a simple `open()/close()` with a context which can get
> > >> MetricGroup is nice and not complex for the first version.
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >>
> > >> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> wrote:
> > >>
> > >>> Thanks, Aljoscha, for picking this up.
> > >>>
> > >>> I agree with the approach of doing the here proposed set of changes
> for
> > >>> now. It already makes things simpler and adds idleness support
> > >>> everywhere.
> > >>>
> > >>> Rich functions and state always add complexity, let's do this in a
> next
> > >>> step, if we have a really compelling case.
> > >>>
> > >>>
> > >>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> aljos...@apache.org>
> > >>> wrote:
> > >>>
> > >>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal
> > is
> > >>>> basically to turn emitting into a "flatMap", we give the
> > >>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
> can
> > >>>> decide whether to output a watermark or not and can also mark the
> > >>>> output
> > >>>> as idle. Changing the interface to return a Watermark (as the
> previous
> > >>>> watermark assigner interface did) would not allow that flexibility.
> > >>>>
> > >>>> Regarding checkpointing the watermark and keeping track of the
> minimum
> > >>>> watermark, this would be the responsibility of the framework (or the
> > >>>> KafkaConsumer in the current implementation). The user-supplied WG
> > does
> > >>>> not need to make sure the watermark doesn't regress.
> > >>>>
> > >>>> Regarding making the WG a "rich function", I can see the potential
> > >>>> benefit but I also see a lot of pitfalls. For example, how should
> the
> > >>>> watermark state be handled in the case of scale-in? It could be made
> > to
> > >>>> work in the Kafka case by attaching the state to the partition state
> > >>>> that we keep, but then we have potential backwards compatibility
> > >>>> problems also for the WM state. Does the WG usually need to keep the
> > >>>> state or might it be enough if the state is transient, i.e. if you
> > have
> > >>>> a restart the WG would loose its histogram but it would rebuild it
> > >>>> quickly and you would get back to the same steady state as before.
> > >>>>
> > >>>> Best,
> > >>>> Aljoscha
> > >>>>
> > >>>> On 27.04.20 12:12, David Anderson wrote:
> > >>>>> Overall I like this proposal; thanks for bringing it forward,
> > >>>>> Aljoscha.
> > >>>>>
> > >>>>> I also like the idea of making the Watermark generator a rich
> > function
> > >>> --
> > >>>>> this should make it more straightforward to implement smarter
> > >>>>> watermark
> > >>>>> generators. Eg, one that uses state to keep statistics about the
> > >>>>> actual
> > >>>>> out-of-orderness, and uses those statistics to implement a variable
> > >>>> delay.
> > >>>>>
> > >>>>> David
> > >>>>>
> > >>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
> kklou...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hi Aljoscha,
> > >>>>>>
> > >>>>>> Thanks for opening the discussion!
> > >>>>>>
> > >>>>>> I have two comments on the FLIP:
> > >>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
> > >>>>>> close(), probably with a Context as argument: I have not fully
> > >>>>>> thought
> > >>>>>> this through but I think that this is more aligned with the rest
> of
> > >>>>>> our rich functions. In addition, it will allow, for example, to
> > >>>>>> initialize the Watermark value, if we decide to checkpoint the
> > >>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
> > >>>>>> anything in the open()).
> > >>>>>> 2) aligned with the above, and with the case where we want to
> > >>>>>> checkpoint the watermark in mind, I am wondering about how we
> could
> > >>>>>> implement this in the future. In the FLIP, it is proposed to
> expose
> > >>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
> Given
> > >>>>>> that there is the implicit contract that watermarks are
> > >>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> > >>>>>> assume) a check that will compare the last emitted WM against the
> > >>>>>> provided one, and emit it only if it is >=. If not, then we risk
> > >>>>>> having the user shooting himself on the foot if he/she
> accidentally
> > >>>>>> forgets the check. Given that the WatermarkGenerator and its
> > >>>>>> caller do
> > >>>>>> not know if the watermark was finally emitted or not (the
> > >>>>>> WatermarkOutput#emitWatermark returns void), who will be
> responsible
> > >>>>>> for checkpointing the WM?
> > >>>>>>
> > >>>>>> Given this, why not having the methods as:
> > >>>>>>
> > >>>>>> public interface WatermarkGenerator<T> {
> > >>>>>>
> > >>>>>>       Watermark onEvent(T event, long eventTimestamp,
> > WatermarkOutput
> > >>>>>> output);
> > >>>>>>
> > >>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
> > >>>>>> }
> > >>>>>>
> > >>>>>> and the caller will be the one enforcing any invariants, such as
> > >>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
> > >>>>>> anything that is needed as it will have complete knowledge as to
> if
> > >>>>>> the WM was emitted or not.
> > >>>>>>
> > >>>>>> What do you think?
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>> Kostas
> > >>>>>>
> > >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> > >>>>>>
> > >>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> > unification.
> > >>> We
> > >>>>>>> have considered this FLIP already in the interfaces for FLIP-95
> [1]
> > >>> and
> > >>>>>>> look forward to update to the new unified watermark generators
> once
> > >>>>>>> FLIP-126 has been accepted.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Timo
> > >>>>>>>
> > >>>>>>> [1] https://github.com/apache/flink/pull/11692
> > >>>>>>>
> > >>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> > >>>>>>>> Hi Everyone!
> > >>>>>>>>
> > >>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
> > >>> separate)
> > >>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an
> > >>>>>>>> experimental branch. I expanded on that work to provide a PoC
> for
> > >>> the
> > >>>>>>>> changes proposed in this FLIP: [2].
> > >>>>>>>>
> > >>>>>>>> Currently, we have two different flavours of Watermark
> > >>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> > >>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> > >>>>>>>> from TimestampAssigner. This means that sources that want to
> > >>>>>>>> support
> > >>>>>>>> watermark assignment/extraction in the source need to support
> two
> > >>>>>>>> separate interfaces, we have two operator implementations for
> the
> > >>>>>>>> different flavours. Also, this makes features such as generic
> > >>> support
> > >>>>>>>> for idleness detection more complicated to implemented because
> we
> > >>>> again
> > >>>>>>>> have to support two types of watermark assigners.
> > >>>>>>>>
> > >>>>>>>> In this FLIP we propose two things:
> > >>>>>>>>
> > >>>>>>>> Unify the Watermark Assigners into one Interface
> > WatermarkGenerator
> > >>>>>>>> Separate this new interface from the TimestampAssigner
> > >>>>>>>> The motivation for the first is to simplify future
> implementations
> > >>> and
> > >>>>>>>> code duplication. The motivation for the second point is again
> > code
> > >>>>>>>> deduplication, most assigners currently have to extend from some
> > >>> base
> > >>>>>>>> timestamp extractor or duplicate the extraction logic, or users
> > >>>>>>>> have
> > >>>> to
> > >>>>>>>> override an abstract method of the watermark assigner to provide
> > >>>>>>>> the
> > >>>>>>>> timestamp extraction logic.
> > >>>>>>>>
> > >>>>>>>> Additionally, we propose to add a generic wrapping
> > >>> WatermarkGenerator
> > >>>>>>>> that provides idleness detection, i.e. it can mark a
> > >>> stream/partition
> > >>>>>> as
> > >>>>>>>> idle if no data arrives after a configured timeout.
> > >>>>>>>>
> > >>>>>>>> The "unify and separate" part refers to the fact that we want to
> > >>> unify
> > >>>>>>>> punctuated and periodic assigners but at the same time split the
> > >>>>>>>> timestamp assigner from the watermark generator.
> > >>>>>>>>
> > >>>>>>>> Please find more details in the FLIP [1]. Looking forward to
> > >>>>>>>> your feedback.
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> Aljoscha
> > >>>>>>>>
> > >>>>>>>> [1]
> > >>>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> > >>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >
> >
> >
>

Reply via email to