Thanks for the explanation. I like the fatory pattern to make the member variables immutable and final.
So +1 to the proposal. Best, Jark On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org> wrote: > I am fine with that. > > Much of the principles seem agreed upon. I understand the need to support > code-generated extractors and we should support most of it already (as > Aljoscha mentioned via the factories) can extend this if needed. > > I think that the factory approach supports code-generated extractors in a > cleaner way even than an extractor with an open/init method. > > > On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > > > We're slightly running out of time. I would propose we vote on the basic > > principle and remain open to later additions. This feature is quite > > important to make the new Kafka Source that is developed as part of > > FLIP-27 useful. Otherwise we would have to use the legacy interfaces in > > the newly added connector. > > > > I know that's a bit unorthodox but would everyone be OK with what's > > currently there and then we iterate? > > > > Best, > > Aljoscha > > > > On 11.05.20 13:57, Aljoscha Krettek wrote: > > > Ah, I meant to write this in my previous email, sorry about that. > > > > > > The WatermarkStrategy, which is basically a factory for a > > > WatermarkGenerator is the replacement for the open() method. This is > the > > > same strategy that was followed for StreamOperatorFactory, which was > > > introduced to allow code generation in the Table API [1]. If we need > > > metrics or other things we would add that as a parameter to the factory > > > method. What do you think? > > > > > > Best, > > > Aljoscha > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11974 > > > > > > On 10.05.20 05:07, Jark Wu wrote: > > >> Hi, > > >> > > >> Regarding to the `open()/close()`, I think it's necessary for > > >> Table&SQL to > > >> compile the generated code. > > >> In Table&SQL, the watermark strategy and event-timestamp is defined > > using > > >> SQL expressions, we will > > >> translate and generate Java code for the expressions. If we have > > >> `open()/close()`, we don't need lazy initialization. > > >> Besides that, I can see a need to report some metrics, e.g. the > current > > >> watermark, the dirty timestamps (null value), etc. > > >> So I think a simple `open()/close()` with a context which can get > > >> MetricGroup is nice and not complex for the first version. > > >> > > >> Best, > > >> Jark > > >> > > >> > > >> > > >> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> wrote: > > >> > > >>> Thanks, Aljoscha, for picking this up. > > >>> > > >>> I agree with the approach of doing the here proposed set of changes > for > > >>> now. It already makes things simpler and adds idleness support > > >>> everywhere. > > >>> > > >>> Rich functions and state always add complexity, let's do this in a > next > > >>> step, if we have a really compelling case. > > >>> > > >>> > > >>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek < > aljos...@apache.org> > > >>> wrote: > > >>> > > >>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal > > is > > >>>> basically to turn emitting into a "flatMap", we give the > > >>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG > can > > >>>> decide whether to output a watermark or not and can also mark the > > >>>> output > > >>>> as idle. Changing the interface to return a Watermark (as the > previous > > >>>> watermark assigner interface did) would not allow that flexibility. > > >>>> > > >>>> Regarding checkpointing the watermark and keeping track of the > minimum > > >>>> watermark, this would be the responsibility of the framework (or the > > >>>> KafkaConsumer in the current implementation). The user-supplied WG > > does > > >>>> not need to make sure the watermark doesn't regress. > > >>>> > > >>>> Regarding making the WG a "rich function", I can see the potential > > >>>> benefit but I also see a lot of pitfalls. For example, how should > the > > >>>> watermark state be handled in the case of scale-in? It could be made > > to > > >>>> work in the Kafka case by attaching the state to the partition state > > >>>> that we keep, but then we have potential backwards compatibility > > >>>> problems also for the WM state. Does the WG usually need to keep the > > >>>> state or might it be enough if the state is transient, i.e. if you > > have > > >>>> a restart the WG would loose its histogram but it would rebuild it > > >>>> quickly and you would get back to the same steady state as before. > > >>>> > > >>>> Best, > > >>>> Aljoscha > > >>>> > > >>>> On 27.04.20 12:12, David Anderson wrote: > > >>>>> Overall I like this proposal; thanks for bringing it forward, > > >>>>> Aljoscha. > > >>>>> > > >>>>> I also like the idea of making the Watermark generator a rich > > function > > >>> -- > > >>>>> this should make it more straightforward to implement smarter > > >>>>> watermark > > >>>>> generators. Eg, one that uses state to keep statistics about the > > >>>>> actual > > >>>>> out-of-orderness, and uses those statistics to implement a variable > > >>>> delay. > > >>>>> > > >>>>> David > > >>>>> > > >>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas < > kklou...@gmail.com> > > >>>> wrote: > > >>>>> > > >>>>>> Hi Aljoscha, > > >>>>>> > > >>>>>> Thanks for opening the discussion! > > >>>>>> > > >>>>>> I have two comments on the FLIP: > > >>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/ > > >>>>>> close(), probably with a Context as argument: I have not fully > > >>>>>> thought > > >>>>>> this through but I think that this is more aligned with the rest > of > > >>>>>> our rich functions. In addition, it will allow, for example, to > > >>>>>> initialize the Watermark value, if we decide to checkpoint the > > >>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do > > >>>>>> anything in the open()). > > >>>>>> 2) aligned with the above, and with the case where we want to > > >>>>>> checkpoint the watermark in mind, I am wondering about how we > could > > >>>>>> implement this in the future. In the FLIP, it is proposed to > expose > > >>>>>> the WatermarkOutput in the methods of the WatermarkGenerator. > Given > > >>>>>> that there is the implicit contract that watermarks are > > >>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I > > >>>>>> assume) a check that will compare the last emitted WM against the > > >>>>>> provided one, and emit it only if it is >=. If not, then we risk > > >>>>>> having the user shooting himself on the foot if he/she > accidentally > > >>>>>> forgets the check. Given that the WatermarkGenerator and its > > >>>>>> caller do > > >>>>>> not know if the watermark was finally emitted or not (the > > >>>>>> WatermarkOutput#emitWatermark returns void), who will be > responsible > > >>>>>> for checkpointing the WM? > > >>>>>> > > >>>>>> Given this, why not having the methods as: > > >>>>>> > > >>>>>> public interface WatermarkGenerator<T> { > > >>>>>> > > >>>>>> Watermark onEvent(T event, long eventTimestamp, > > WatermarkOutput > > >>>>>> output); > > >>>>>> > > >>>>>> Watermark onPeriodicEmit(WatermarkOutput output); > > >>>>>> } > > >>>>>> > > >>>>>> and the caller will be the one enforcing any invariants, such as > > >>>>>> non-decreasing watermarks. In this way, the caller can checkpoint > > >>>>>> anything that is needed as it will have complete knowledge as to > if > > >>>>>> the WM was emitted or not. > > >>>>>> > > >>>>>> What do you think? > > >>>>>> > > >>>>>> Cheers, > > >>>>>> Kostas > > >>>>>> > > >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601 > > >>>>>> > > >>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org> > > >>>> wrote: > > >>>>>>> > > >>>>>>> Thanks for the proposal Aljoscha. This is a very useful > > unification. > > >>> We > > >>>>>>> have considered this FLIP already in the interfaces for FLIP-95 > [1] > > >>> and > > >>>>>>> look forward to update to the new unified watermark generators > once > > >>>>>>> FLIP-126 has been accepted. > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Timo > > >>>>>>> > > >>>>>>> [1] https://github.com/apache/flink/pull/11692 > > >>>>>>> > > >>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote: > > >>>>>>>> Hi Everyone! > > >>>>>>>> > > >>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and > > >>> separate) > > >>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an > > >>>>>>>> experimental branch. I expanded on that work to provide a PoC > for > > >>> the > > >>>>>>>> changes proposed in this FLIP: [2]. > > >>>>>>>> > > >>>>>>>> Currently, we have two different flavours of Watermark > > >>>>>>>> Assigners: AssignerWithPunctuatedWatermarks > > >>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend > > >>>>>>>> from TimestampAssigner. This means that sources that want to > > >>>>>>>> support > > >>>>>>>> watermark assignment/extraction in the source need to support > two > > >>>>>>>> separate interfaces, we have two operator implementations for > the > > >>>>>>>> different flavours. Also, this makes features such as generic > > >>> support > > >>>>>>>> for idleness detection more complicated to implemented because > we > > >>>> again > > >>>>>>>> have to support two types of watermark assigners. > > >>>>>>>> > > >>>>>>>> In this FLIP we propose two things: > > >>>>>>>> > > >>>>>>>> Unify the Watermark Assigners into one Interface > > WatermarkGenerator > > >>>>>>>> Separate this new interface from the TimestampAssigner > > >>>>>>>> The motivation for the first is to simplify future > implementations > > >>> and > > >>>>>>>> code duplication. The motivation for the second point is again > > code > > >>>>>>>> deduplication, most assigners currently have to extend from some > > >>> base > > >>>>>>>> timestamp extractor or duplicate the extraction logic, or users > > >>>>>>>> have > > >>>> to > > >>>>>>>> override an abstract method of the watermark assigner to provide > > >>>>>>>> the > > >>>>>>>> timestamp extraction logic. > > >>>>>>>> > > >>>>>>>> Additionally, we propose to add a generic wrapping > > >>> WatermarkGenerator > > >>>>>>>> that provides idleness detection, i.e. it can mark a > > >>> stream/partition > > >>>>>> as > > >>>>>>>> idle if no data arrives after a configured timeout. > > >>>>>>>> > > >>>>>>>> The "unify and separate" part refers to the fact that we want to > > >>> unify > > >>>>>>>> punctuated and periodic assigners but at the same time split the > > >>>>>>>> timestamp assigner from the watermark generator. > > >>>>>>>> > > >>>>>>>> Please find more details in the FLIP [1]. Looking forward to > > >>>>>>>> your feedback. > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> Aljoscha > > >>>>>>>> > > >>>>>>>> [1] > > >>>>>>>> > > >>>>>> > > >>>> > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners > > >>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >>> > > >> > > > > > > > >