I have similar thoughts to @Stephan Ad. 1 I tried something like this on your branch:
/** * Adds the given {@link TimestampAssigner} to this {@link WatermarkStrategies}. For top-level classes that implement both Serializable and TimestampAssigner */ public <TA extends TimestampAssigner<T> & Serializable> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); this.timestampAssigner = timestampAssigner; return this; } @FunctionalInterface public interface SerializableTimestampAssigner<T> extends TimestampAssigner<T>, Serializable { } /** * Adds the given {@link TimestampAssigner} to this {@link WatermarkStrategies}. * Helper method for serializable lambdas. */ public WatermarkStrategies<T> withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner) { checkNotNull(timestampAssigner, "timestampAssigner"); this.timestampAssigner = timestampAssigner; return this; } But I understand if that's too hacky. It's just a pity that we must enforce limitations on an interface that are not strictly necessary. Ad 2/3 I am aware the watermark assigner/timestamp extractor can be applied further down the graph. Originally I also wanted to suggest sourceTimestamp and SourceTimestampAssigner, but then I realized it can be used also after the sources as you correctly pointed out. Even if the TimestampAssigner is used after the source there might be some native/record timestamp in the StreamRecord, that could've been extracted by previous assigner. Best, Dawid On 12/05/2020 16:47, Stephan Ewen wrote: > @Aljoscha > > About (1) could we have an interface SerializableTimestampAssigner that > simply mixes in the java.io.Serializable interface? Or will this be too > clumsy? > > About (3) RecordTimeStamp seems to fit both cases (in-source-record > timestamp, in stream-record timestamp). > > On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Definitely +1 to point 2) raised by Dawid. I'm not sure on points 1) and >> 3). >> >> 1) I can see the benefit of that but in reality most timestamp assigners >> will probably need to be Serializable. If you look at my (updated) POC >> branch [1] you can see how a TimestampAssigner would be specified on the >> WatermarkStrategies helper class: [2]. The signature of this would have >> to be changed to something like: >> >> public <TA extends TimestampAssigner<T> & Serializable> >> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) >> >> Then, however, it would not be possible for users to specify a lambda or >> anonymous inner function for the TimestampAssigner like this: >> >> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies >> .forGenerator(new PeriodicTestWatermarkGenerator()) >> .withTimestampAssigner((event, timestamp) -> event) >> .build(); >> >> 3) This makes sense if we only allow WatermarkStrategies on sources, >> where the previous timestamp really is the "native" timestamp. >> Currently, we also allow setting watermark strategies at arbitrary >> points in the graph. I'm thinking we probably should only allow that in >> sources but it's not the reality currently. I'm not against renaming it, >> just voicing those thoughts. >> >> Best, >> Aljoscha >> >> >> [1] https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased >> [2] >> >> https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81 >> >> On 12.05.20 15:48, Stephan Ewen wrote: >>> +1 to all of Dawid's suggestions, makes a lot of sense to me >>> >>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <dwysakow...@apache.org >>> >>> wrote: >>> >>>> Hi Aljoscha, >>>> >>>> Sorry for adding comments during the vote, but I have some really minor >>>> suggestions that should not influence the voting thread imo. >>>> >>>> 1) Does it make sense to have the TimestampAssigner extend from Flink's >>>> Function? This implies it has to be serializable which with the factory >>>> pattern is not strictly necessary, right? BTW I really like that you >>>> suggested the FunctionInterface annotation there. >>>> >>>> 2) Could we rename the IdentityTimestampAssigner to e.g. >>>> >> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner... >>>> Personally I found the IdentityTimestampAssigner a bit misleading as it >>>> usually mean a no-op. Which did not click for me, as I assumed it >>>> somehow returns the incoming record itself. >>>> >>>> 3) Could we rename the second parameter of TimestampAssigner#extract to >>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point >>>> above. This parameter was also a bit confusing for me as I thought at >>>> times its somehow related to >>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the >>>> whole system currentTimestamp. >>>> >>>> Other than those three points I like the proposal and I was about to >>>> vote +1 if it was not for those three points. >>>> >>>> Best, >>>> >>>> Dawid >>>> >>>> On 11/05/2020 16:57, Jark Wu wrote: >>>>> Thanks for the explanation. I like the fatory pattern to make the >> member >>>>> variables immutable and final. >>>>> >>>>> So +1 to the proposal. >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>>> I am fine with that. >>>>>> >>>>>> Much of the principles seem agreed upon. I understand the need to >>>> support >>>>>> code-generated extractors and we should support most of it already (as >>>>>> Aljoscha mentioned via the factories) can extend this if needed. >>>>>> >>>>>> I think that the factory approach supports code-generated extractors >> in >>>> a >>>>>> cleaner way even than an extractor with an open/init method. >>>>>> >>>>>> >>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <aljos...@apache.org >>>>>> wrote: >>>>>> >>>>>>> We're slightly running out of time. I would propose we vote on the >>>> basic >>>>>>> principle and remain open to later additions. This feature is quite >>>>>>> important to make the new Kafka Source that is developed as part of >>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces >> in >>>>>>> the newly added connector. >>>>>>> >>>>>>> I know that's a bit unorthodox but would everyone be OK with what's >>>>>>> currently there and then we iterate? >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote: >>>>>>>> Ah, I meant to write this in my previous email, sorry about that. >>>>>>>> >>>>>>>> The WatermarkStrategy, which is basically a factory for a >>>>>>>> WatermarkGenerator is the replacement for the open() method. This is >>>>>> the >>>>>>>> same strategy that was followed for StreamOperatorFactory, which was >>>>>>>> introduced to allow code generation in the Table API [1]. If we need >>>>>>>> metrics or other things we would add that as a parameter to the >>>> factory >>>>>>>> method. What do you think? >>>>>>>> >>>>>>>> Best, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974 >>>>>>>> >>>>>>>> On 10.05.20 05:07, Jark Wu wrote: >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for >>>>>>>>> Table&SQL to >>>>>>>>> compile the generated code. >>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is defined >>>>>>> using >>>>>>>>> SQL expressions, we will >>>>>>>>> translate and generate Java code for the expressions. If we have >>>>>>>>> `open()/close()`, we don't need lazy initialization. >>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the >>>>>> current >>>>>>>>> watermark, the dirty timestamps (null value), etc. >>>>>>>>> So I think a simple `open()/close()` with a context which can get >>>>>>>>> MetricGroup is nice and not complex for the first version. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jark >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> >> wrote: >>>>>>>>>> Thanks, Aljoscha, for picking this up. >>>>>>>>>> >>>>>>>>>> I agree with the approach of doing the here proposed set of >> changes >>>>>> for >>>>>>>>>> now. It already makes things simpler and adds idleness support >>>>>>>>>> everywhere. >>>>>>>>>> >>>>>>>>>> Rich functions and state always add complexity, let's do this in a >>>>>> next >>>>>>>>>> step, if we have a really compelling case. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek < >>>>>> aljos...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The >>>> proposal >>>>>>> is >>>>>>>>>>> basically to turn emitting into a "flatMap", we give the >>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG >>>>>> can >>>>>>>>>>> decide whether to output a watermark or not and can also mark the >>>>>>>>>>> output >>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the >>>>>> previous >>>>>>>>>>> watermark assigner interface did) would not allow that >> flexibility. >>>>>>>>>>> Regarding checkpointing the watermark and keeping track of the >>>>>> minimum >>>>>>>>>>> watermark, this would be the responsibility of the framework (or >>>> the >>>>>>>>>>> KafkaConsumer in the current implementation). The user-supplied >> WG >>>>>>> does >>>>>>>>>>> not need to make sure the watermark doesn't regress. >>>>>>>>>>> >>>>>>>>>>> Regarding making the WG a "rich function", I can see the >> potential >>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how should >>>>>> the >>>>>>>>>>> watermark state be handled in the case of scale-in? It could be >>>> made >>>>>>> to >>>>>>>>>>> work in the Kafka case by attaching the state to the partition >>>> state >>>>>>>>>>> that we keep, but then we have potential backwards compatibility >>>>>>>>>>> problems also for the WM state. Does the WG usually need to keep >>>> the >>>>>>>>>>> state or might it be enough if the state is transient, i.e. if >> you >>>>>>> have >>>>>>>>>>> a restart the WG would loose its histogram but it would rebuild >> it >>>>>>>>>>> quickly and you would get back to the same steady state as >> before. >>>>>>>>>>> Best, >>>>>>>>>>> Aljoscha >>>>>>>>>>> >>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote: >>>>>>>>>>>> Overall I like this proposal; thanks for bringing it forward, >>>>>>>>>>>> Aljoscha. >>>>>>>>>>>> >>>>>>>>>>>> I also like the idea of making the Watermark generator a rich >>>>>>> function >>>>>>>>>> -- >>>>>>>>>>>> this should make it more straightforward to implement smarter >>>>>>>>>>>> watermark >>>>>>>>>>>> generators. Eg, one that uses state to keep statistics about the >>>>>>>>>>>> actual >>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a >>>> variable >>>>>>>>>>> delay. >>>>>>>>>>>> David >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas < >>>>>> kklou...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>>>> Hi Aljoscha, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for opening the discussion! >>>>>>>>>>>>> >>>>>>>>>>>>> I have two comments on the FLIP: >>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e. >> open()/ >>>>>>>>>>>>> close(), probably with a Context as argument: I have not fully >>>>>>>>>>>>> thought >>>>>>>>>>>>> this through but I think that this is more aligned with the >> rest >>>>>> of >>>>>>>>>>>>> our rich functions. In addition, it will allow, for example, to >>>>>>>>>>>>> initialize the Watermark value, if we decide to checkpoint the >>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to >> do >>>>>>>>>>>>> anything in the open()). >>>>>>>>>>>>> 2) aligned with the above, and with the case where we want to >>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about how we >>>>>> could >>>>>>>>>>>>> implement this in the future. In the FLIP, it is proposed to >>>>>> expose >>>>>>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator. >>>>>> Given >>>>>>>>>>>>> that there is the implicit contract that watermarks are >>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have >> (I >>>>>>>>>>>>> assume) a check that will compare the last emitted WM against >> the >>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we >> risk >>>>>>>>>>>>> having the user shooting himself on the foot if he/she >>>>>> accidentally >>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and its >>>>>>>>>>>>> caller do >>>>>>>>>>>>> not know if the watermark was finally emitted or not (the >>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be >>>>>> responsible >>>>>>>>>>>>> for checkpointing the WM? >>>>>>>>>>>>> >>>>>>>>>>>>> Given this, why not having the methods as: >>>>>>>>>>>>> >>>>>>>>>>>>> public interface WatermarkGenerator<T> { >>>>>>>>>>>>> >>>>>>>>>>>>> Watermark onEvent(T event, long eventTimestamp, >>>>>>> WatermarkOutput >>>>>>>>>>>>> output); >>>>>>>>>>>>> >>>>>>>>>>>>> Watermark onPeriodicEmit(WatermarkOutput output); >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> and the caller will be the one enforcing any invariants, such >> as >>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can >> checkpoint >>>>>>>>>>>>> anything that is needed as it will have complete knowledge as >> to >>>>>> if >>>>>>>>>>>>> the WM was emitted or not. >>>>>>>>>>>>> >>>>>>>>>>>>> What do you think? >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Kostas >>>>>>>>>>>>> >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601 >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther < >> twal...@apache.org >>>>>>>>>>> wrote: >>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful >>>>>>> unification. >>>>>>>>>> We >>>>>>>>>>>>>> have considered this FLIP already in the interfaces for >> FLIP-95 >>>>>> [1] >>>>>>>>>> and >>>>>>>>>>>>>> look forward to update to the new unified watermark generators >>>>>> once >>>>>>>>>>>>>> FLIP-126 has been accepted. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> Timo >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692 >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote: >>>>>>>>>>>>>>> Hi Everyone! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and >>>>>>>>>> separate) >>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in >>>> an >>>>>>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC >>>>>> for >>>>>>>>>> the >>>>>>>>>>>>>>> changes proposed in this FLIP: [2]. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark >>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks >>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend >>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that want to >>>>>>>>>>>>>>> support >>>>>>>>>>>>>>> watermark assignment/extraction in the source need to support >>>>>> two >>>>>>>>>>>>>>> separate interfaces, we have two operator implementations for >>>>>> the >>>>>>>>>>>>>>> different flavours. Also, this makes features such as generic >>>>>>>>>> support >>>>>>>>>>>>>>> for idleness detection more complicated to implemented >> because >>>>>> we >>>>>>>>>>> again >>>>>>>>>>>>>>> have to support two types of watermark assigners. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In this FLIP we propose two things: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface >>>>>>> WatermarkGenerator >>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner >>>>>>>>>>>>>>> The motivation for the first is to simplify future >>>>>> implementations >>>>>>>>>> and >>>>>>>>>>>>>>> code duplication. The motivation for the second point is >> again >>>>>>> code >>>>>>>>>>>>>>> deduplication, most assigners currently have to extend from >>>> some >>>>>>>>>> base >>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or >> users >>>>>>>>>>>>>>> have >>>>>>>>>>> to >>>>>>>>>>>>>>> override an abstract method of the watermark assigner to >>>> provide >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> timestamp extraction logic. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping >>>>>>>>>> WatermarkGenerator >>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a >>>>>>>>>> stream/partition >>>>>>>>>>>>> as >>>>>>>>>>>>>>> idle if no data arrives after a configured timeout. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that we want >>>> to >>>>>>>>>> unify >>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time split >>>> the >>>>>>>>>>>>>>> timestamp assigner from the watermark generator. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to >>>>>>>>>>>>>>> your feedback. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners >>>>>>>>>>>>>>> [2] >> https://github.com/aljoscha/flink/tree/stephan-event-time >>>> >>
signature.asc
Description: OpenPGP digital signature