Thanks, Aljoscha, for picking this up.

I agree with the approach of doing the here proposed set of changes for
now. It already makes things simpler and adds idleness support everywhere.

Rich functions and state always add complexity, let's do this in a next
step, if we have a really compelling case.


On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Regarding the WatermarkGenerator (WG) interface itself. The proposal is
> basically to turn emitting into a "flatMap", we give the
> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
> decide whether to output a watermark or not and can also mark the output
> as idle. Changing the interface to return a Watermark (as the previous
> watermark assigner interface did) would not allow that flexibility.
>
> Regarding checkpointing the watermark and keeping track of the minimum
> watermark, this would be the responsibility of the framework (or the
> KafkaConsumer in the current implementation). The user-supplied WG does
> not need to make sure the watermark doesn't regress.
>
> Regarding making the WG a "rich function", I can see the potential
> benefit but I also see a lot of pitfalls. For example, how should the
> watermark state be handled in the case of scale-in? It could be made to
> work in the Kafka case by attaching the state to the partition state
> that we keep, but then we have potential backwards compatibility
> problems also for the WM state. Does the WG usually need to keep the
> state or might it be enough if the state is transient, i.e. if you have
> a restart the WG would loose its histogram but it would rebuild it
> quickly and you would get back to the same steady state as before.
>
> Best,
> Aljoscha
>
> On 27.04.20 12:12, David Anderson wrote:
> > Overall I like this proposal; thanks for bringing it forward, Aljoscha.
> >
> > I also like the idea of making the Watermark generator a rich function --
> > this should make it more straightforward to implement smarter watermark
> > generators. Eg, one that uses state to keep statistics about the actual
> > out-of-orderness, and uses those statistics to implement a variable
> delay.
> >
> > David
> >
> > On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <kklou...@gmail.com>
> wrote:
> >
> >> Hi Aljoscha,
> >>
> >> Thanks for opening the discussion!
> >>
> >> I have two comments on the FLIP:
> >> 1) we could add lifecycle methods to the Generator, i.e. open()/
> >> close(), probably with a Context as argument: I have not fully thought
> >> this through but I think that this is more aligned with the rest of
> >> our rich functions. In addition, it will allow, for example, to
> >> initialize the Watermark value, if we decide to checkpoint the
> >> watermark (see [1]) (I also do not know if Table/SQL needs to do
> >> anything in the open()).
> >> 2) aligned with the above, and with the case where we want to
> >> checkpoint the watermark in mind, I am wondering about how we could
> >> implement this in the future. In the FLIP, it is proposed to expose
> >> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> >> that there is the implicit contract that watermarks are
> >> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> >> assume) a check that will compare the last emitted WM against the
> >> provided one, and emit it only if it is >=. If not, then we risk
> >> having the user shooting himself on the foot if he/she accidentally
> >> forgets the check. Given that the WatermarkGenerator and its caller do
> >> not know if the watermark was finally emitted or not (the
> >> WatermarkOutput#emitWatermark returns void), who will be responsible
> >> for checkpointing the WM?
> >>
> >> Given this, why not having the methods as:
> >>
> >> public interface WatermarkGenerator<T> {
> >>
> >>      Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
> >> output);
> >>
> >>      Watermark onPeriodicEmit(WatermarkOutput output);
> >> }
> >>
> >> and the caller will be the one enforcing any invariants, such as
> >> non-decreasing watermarks. In this way, the caller can checkpoint
> >> anything that is needed as it will have complete knowledge as to if
> >> the WM was emitted or not.
> >>
> >> What do you think?
> >>
> >> Cheers,
> >> Kostas
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>
> >> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org>
> wrote:
> >>>
> >>> Thanks for the proposal Aljoscha. This is a very useful unification. We
> >>> have considered this FLIP already in the interfaces for FLIP-95 [1] and
> >>> look forward to update to the new unified watermark generators once
> >>> FLIP-126 has been accepted.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> [1] https://github.com/apache/flink/pull/11692
> >>>
> >>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>> Hi Everyone!
> >>>>
> >>>> We would like to start a discussion on "FLIP-126: Unify (and separate)
> >>>> Watermark Assigners" [1]. This work was started by Stephan in an
> >>>> experimental branch. I expanded on that work to provide a PoC for the
> >>>> changes proposed in this FLIP: [2].
> >>>>
> >>>> Currently, we have two different flavours of Watermark
> >>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>> from TimestampAssigner. This means that sources that want to support
> >>>> watermark assignment/extraction in the source need to support two
> >>>> separate interfaces, we have two operator implementations for the
> >>>> different flavours. Also, this makes features such as generic support
> >>>> for idleness detection more complicated to implemented because we
> again
> >>>> have to support two types of watermark assigners.
> >>>>
> >>>> In this FLIP we propose two things:
> >>>>
> >>>> Unify the Watermark Assigners into one Interface WatermarkGenerator
> >>>> Separate this new interface from the TimestampAssigner
> >>>> The motivation for the first is to simplify future implementations and
> >>>> code duplication. The motivation for the second point is again code
> >>>> deduplication, most assigners currently have to extend from some base
> >>>> timestamp extractor or duplicate the extraction logic, or users have
> to
> >>>> override an abstract method of the watermark assigner to provide the
> >>>> timestamp extraction logic.
> >>>>
> >>>> Additionally, we propose to add a generic wrapping WatermarkGenerator
> >>>> that provides idleness detection, i.e. it can mark a stream/partition
> >> as
> >>>> idle if no data arrives after a configured timeout.
> >>>>
> >>>> The "unify and separate" part refers to the fact that we want to unify
> >>>> punctuated and periodic assigners but at the same time split the
> >>>> timestamp assigner from the watermark generator.
> >>>>
> >>>> Please find more details in the FLIP [1]. Looking forward to
> >>>> your feedback.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> [1]
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>>
> >>>>
> >>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>
> >>
> >
>
>

Reply via email to