Hi, Regarding to the `open()/close()`, I think it's necessary for Table&SQL to compile the generated code. In Table&SQL, the watermark strategy and event-timestamp is defined using SQL expressions, we will translate and generate Java code for the expressions. If we have `open()/close()`, we don't need lazy initialization. Besides that, I can see a need to report some metrics, e.g. the current watermark, the dirty timestamps (null value), etc. So I think a simple `open()/close()` with a context which can get MetricGroup is nice and not complex for the first version.
Best, Jark On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> wrote: > Thanks, Aljoscha, for picking this up. > > I agree with the approach of doing the here proposed set of changes for > now. It already makes things simpler and adds idleness support everywhere. > > Rich functions and state always add complexity, let's do this in a next > step, if we have a really compelling case. > > > On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Regarding the WatermarkGenerator (WG) interface itself. The proposal is > > basically to turn emitting into a "flatMap", we give the > > WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can > > decide whether to output a watermark or not and can also mark the output > > as idle. Changing the interface to return a Watermark (as the previous > > watermark assigner interface did) would not allow that flexibility. > > > > Regarding checkpointing the watermark and keeping track of the minimum > > watermark, this would be the responsibility of the framework (or the > > KafkaConsumer in the current implementation). The user-supplied WG does > > not need to make sure the watermark doesn't regress. > > > > Regarding making the WG a "rich function", I can see the potential > > benefit but I also see a lot of pitfalls. For example, how should the > > watermark state be handled in the case of scale-in? It could be made to > > work in the Kafka case by attaching the state to the partition state > > that we keep, but then we have potential backwards compatibility > > problems also for the WM state. Does the WG usually need to keep the > > state or might it be enough if the state is transient, i.e. if you have > > a restart the WG would loose its histogram but it would rebuild it > > quickly and you would get back to the same steady state as before. > > > > Best, > > Aljoscha > > > > On 27.04.20 12:12, David Anderson wrote: > > > Overall I like this proposal; thanks for bringing it forward, Aljoscha. > > > > > > I also like the idea of making the Watermark generator a rich function > -- > > > this should make it more straightforward to implement smarter watermark > > > generators. Eg, one that uses state to keep statistics about the actual > > > out-of-orderness, and uses those statistics to implement a variable > > delay. > > > > > > David > > > > > > On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <kklou...@gmail.com> > > wrote: > > > > > >> Hi Aljoscha, > > >> > > >> Thanks for opening the discussion! > > >> > > >> I have two comments on the FLIP: > > >> 1) we could add lifecycle methods to the Generator, i.e. open()/ > > >> close(), probably with a Context as argument: I have not fully thought > > >> this through but I think that this is more aligned with the rest of > > >> our rich functions. In addition, it will allow, for example, to > > >> initialize the Watermark value, if we decide to checkpoint the > > >> watermark (see [1]) (I also do not know if Table/SQL needs to do > > >> anything in the open()). > > >> 2) aligned with the above, and with the case where we want to > > >> checkpoint the watermark in mind, I am wondering about how we could > > >> implement this in the future. In the FLIP, it is proposed to expose > > >> the WatermarkOutput in the methods of the WatermarkGenerator. Given > > >> that there is the implicit contract that watermarks are > > >> non-decreasing, the WatermarkOutput#emitWatermark() will have (I > > >> assume) a check that will compare the last emitted WM against the > > >> provided one, and emit it only if it is >=. If not, then we risk > > >> having the user shooting himself on the foot if he/she accidentally > > >> forgets the check. Given that the WatermarkGenerator and its caller do > > >> not know if the watermark was finally emitted or not (the > > >> WatermarkOutput#emitWatermark returns void), who will be responsible > > >> for checkpointing the WM? > > >> > > >> Given this, why not having the methods as: > > >> > > >> public interface WatermarkGenerator<T> { > > >> > > >> Watermark onEvent(T event, long eventTimestamp, WatermarkOutput > > >> output); > > >> > > >> Watermark onPeriodicEmit(WatermarkOutput output); > > >> } > > >> > > >> and the caller will be the one enforcing any invariants, such as > > >> non-decreasing watermarks. In this way, the caller can checkpoint > > >> anything that is needed as it will have complete knowledge as to if > > >> the WM was emitted or not. > > >> > > >> What do you think? > > >> > > >> Cheers, > > >> Kostas > > >> > > >> [1] https://issues.apache.org/jira/browse/FLINK-5601 > > >> > > >> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org> > > wrote: > > >>> > > >>> Thanks for the proposal Aljoscha. This is a very useful unification. > We > > >>> have considered this FLIP already in the interfaces for FLIP-95 [1] > and > > >>> look forward to update to the new unified watermark generators once > > >>> FLIP-126 has been accepted. > > >>> > > >>> Regards, > > >>> Timo > > >>> > > >>> [1] https://github.com/apache/flink/pull/11692 > > >>> > > >>> On 20.04.20 18:10, Aljoscha Krettek wrote: > > >>>> Hi Everyone! > > >>>> > > >>>> We would like to start a discussion on "FLIP-126: Unify (and > separate) > > >>>> Watermark Assigners" [1]. This work was started by Stephan in an > > >>>> experimental branch. I expanded on that work to provide a PoC for > the > > >>>> changes proposed in this FLIP: [2]. > > >>>> > > >>>> Currently, we have two different flavours of Watermark > > >>>> Assigners: AssignerWithPunctuatedWatermarks > > >>>> and AssignerWithPeriodicWatermarks. Both of them extend > > >>>> from TimestampAssigner. This means that sources that want to support > > >>>> watermark assignment/extraction in the source need to support two > > >>>> separate interfaces, we have two operator implementations for the > > >>>> different flavours. Also, this makes features such as generic > support > > >>>> for idleness detection more complicated to implemented because we > > again > > >>>> have to support two types of watermark assigners. > > >>>> > > >>>> In this FLIP we propose two things: > > >>>> > > >>>> Unify the Watermark Assigners into one Interface WatermarkGenerator > > >>>> Separate this new interface from the TimestampAssigner > > >>>> The motivation for the first is to simplify future implementations > and > > >>>> code duplication. The motivation for the second point is again code > > >>>> deduplication, most assigners currently have to extend from some > base > > >>>> timestamp extractor or duplicate the extraction logic, or users have > > to > > >>>> override an abstract method of the watermark assigner to provide the > > >>>> timestamp extraction logic. > > >>>> > > >>>> Additionally, we propose to add a generic wrapping > WatermarkGenerator > > >>>> that provides idleness detection, i.e. it can mark a > stream/partition > > >> as > > >>>> idle if no data arrives after a configured timeout. > > >>>> > > >>>> The "unify and separate" part refers to the fact that we want to > unify > > >>>> punctuated and periodic assigners but at the same time split the > > >>>> timestamp assigner from the watermark generator. > > >>>> > > >>>> Please find more details in the FLIP [1]. Looking forward to > > >>>> your feedback. > > >>>> > > >>>> Best, > > >>>> Aljoscha > > >>>> > > >>>> [1] > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners > > >>>> > > >>>> > > >>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time > > >>> > > >> > > > > > > > >