Hi,

Regarding to the `open()/close()`, I think it's necessary for Table&SQL to
compile the generated code.
In Table&SQL, the watermark strategy and event-timestamp is defined using
SQL expressions, we will
translate and generate Java code for the expressions. If we have
`open()/close()`, we don't need lazy initialization.
Besides that, I can see a need to report some metrics, e.g. the current
watermark, the dirty timestamps (null value), etc.
So I think a simple `open()/close()` with a context which can get
MetricGroup is nice and not complex for the first version.

Best,
Jark



On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> wrote:

> Thanks, Aljoscha, for picking this up.
>
> I agree with the approach of doing the here proposed set of changes for
> now. It already makes things simpler and adds idleness support everywhere.
>
> Rich functions and state always add complexity, let's do this in a next
> step, if we have a really compelling case.
>
>
> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Regarding the WatermarkGenerator (WG) interface itself. The proposal is
> > basically to turn emitting into a "flatMap", we give the
> > WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
> > decide whether to output a watermark or not and can also mark the output
> > as idle. Changing the interface to return a Watermark (as the previous
> > watermark assigner interface did) would not allow that flexibility.
> >
> > Regarding checkpointing the watermark and keeping track of the minimum
> > watermark, this would be the responsibility of the framework (or the
> > KafkaConsumer in the current implementation). The user-supplied WG does
> > not need to make sure the watermark doesn't regress.
> >
> > Regarding making the WG a "rich function", I can see the potential
> > benefit but I also see a lot of pitfalls. For example, how should the
> > watermark state be handled in the case of scale-in? It could be made to
> > work in the Kafka case by attaching the state to the partition state
> > that we keep, but then we have potential backwards compatibility
> > problems also for the WM state. Does the WG usually need to keep the
> > state or might it be enough if the state is transient, i.e. if you have
> > a restart the WG would loose its histogram but it would rebuild it
> > quickly and you would get back to the same steady state as before.
> >
> > Best,
> > Aljoscha
> >
> > On 27.04.20 12:12, David Anderson wrote:
> > > Overall I like this proposal; thanks for bringing it forward, Aljoscha.
> > >
> > > I also like the idea of making the Watermark generator a rich function
> --
> > > this should make it more straightforward to implement smarter watermark
> > > generators. Eg, one that uses state to keep statistics about the actual
> > > out-of-orderness, and uses those statistics to implement a variable
> > delay.
> > >
> > > David
> > >
> > > On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <kklou...@gmail.com>
> > wrote:
> > >
> > >> Hi Aljoscha,
> > >>
> > >> Thanks for opening the discussion!
> > >>
> > >> I have two comments on the FLIP:
> > >> 1) we could add lifecycle methods to the Generator, i.e. open()/
> > >> close(), probably with a Context as argument: I have not fully thought
> > >> this through but I think that this is more aligned with the rest of
> > >> our rich functions. In addition, it will allow, for example, to
> > >> initialize the Watermark value, if we decide to checkpoint the
> > >> watermark (see [1]) (I also do not know if Table/SQL needs to do
> > >> anything in the open()).
> > >> 2) aligned with the above, and with the case where we want to
> > >> checkpoint the watermark in mind, I am wondering about how we could
> > >> implement this in the future. In the FLIP, it is proposed to expose
> > >> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> > >> that there is the implicit contract that watermarks are
> > >> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> > >> assume) a check that will compare the last emitted WM against the
> > >> provided one, and emit it only if it is >=. If not, then we risk
> > >> having the user shooting himself on the foot if he/she accidentally
> > >> forgets the check. Given that the WatermarkGenerator and its caller do
> > >> not know if the watermark was finally emitted or not (the
> > >> WatermarkOutput#emitWatermark returns void), who will be responsible
> > >> for checkpointing the WM?
> > >>
> > >> Given this, why not having the methods as:
> > >>
> > >> public interface WatermarkGenerator<T> {
> > >>
> > >>      Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
> > >> output);
> > >>
> > >>      Watermark onPeriodicEmit(WatermarkOutput output);
> > >> }
> > >>
> > >> and the caller will be the one enforcing any invariants, such as
> > >> non-decreasing watermarks. In this way, the caller can checkpoint
> > >> anything that is needed as it will have complete knowledge as to if
> > >> the WM was emitted or not.
> > >>
> > >> What do you think?
> > >>
> > >> Cheers,
> > >> Kostas
> > >>
> > >> [1] https://issues.apache.org/jira/browse/FLINK-5601
> > >>
> > >> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org>
> > wrote:
> > >>>
> > >>> Thanks for the proposal Aljoscha. This is a very useful unification.
> We
> > >>> have considered this FLIP already in the interfaces for FLIP-95 [1]
> and
> > >>> look forward to update to the new unified watermark generators once
> > >>> FLIP-126 has been accepted.
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>> [1] https://github.com/apache/flink/pull/11692
> > >>>
> > >>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> > >>>> Hi Everyone!
> > >>>>
> > >>>> We would like to start a discussion on "FLIP-126: Unify (and
> separate)
> > >>>> Watermark Assigners" [1]. This work was started by Stephan in an
> > >>>> experimental branch. I expanded on that work to provide a PoC for
> the
> > >>>> changes proposed in this FLIP: [2].
> > >>>>
> > >>>> Currently, we have two different flavours of Watermark
> > >>>> Assigners: AssignerWithPunctuatedWatermarks
> > >>>> and AssignerWithPeriodicWatermarks. Both of them extend
> > >>>> from TimestampAssigner. This means that sources that want to support
> > >>>> watermark assignment/extraction in the source need to support two
> > >>>> separate interfaces, we have two operator implementations for the
> > >>>> different flavours. Also, this makes features such as generic
> support
> > >>>> for idleness detection more complicated to implemented because we
> > again
> > >>>> have to support two types of watermark assigners.
> > >>>>
> > >>>> In this FLIP we propose two things:
> > >>>>
> > >>>> Unify the Watermark Assigners into one Interface WatermarkGenerator
> > >>>> Separate this new interface from the TimestampAssigner
> > >>>> The motivation for the first is to simplify future implementations
> and
> > >>>> code duplication. The motivation for the second point is again code
> > >>>> deduplication, most assigners currently have to extend from some
> base
> > >>>> timestamp extractor or duplicate the extraction logic, or users have
> > to
> > >>>> override an abstract method of the watermark assigner to provide the
> > >>>> timestamp extraction logic.
> > >>>>
> > >>>> Additionally, we propose to add a generic wrapping
> WatermarkGenerator
> > >>>> that provides idleness detection, i.e. it can mark a
> stream/partition
> > >> as
> > >>>> idle if no data arrives after a configured timeout.
> > >>>>
> > >>>> The "unify and separate" part refers to the fact that we want to
> unify
> > >>>> punctuated and periodic assigners but at the same time split the
> > >>>> timestamp assigner from the watermark generator.
> > >>>>
> > >>>> Please find more details in the FLIP [1]. Looking forward to
> > >>>> your feedback.
> > >>>>
> > >>>> Best,
> > >>>> Aljoscha
> > >>>>
> > >>>> [1]
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> > >>>>
> > >>>>
> > >>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> > >>>
> > >>
> > >
> >
> >
>

Reply via email to