Hi Fabian, I want to extract timestamps from my event. However, the events stream can be sparse at times (e.g. 2 days without any events). What's the best strategy to create watermarks if I want real-time processing of the events which enter the stream?
Jayant Ameta On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Another thing to point out is that watermarks are usually data-driven, > i.e., they depend on the timestamps of the events and not on the clock of > the machine. > Otherwise, you might observe a lot of late data, i.e., events with > timestamps smaller than the last watermark. > > If you assign timestamps and watermarks based on the clock of the machine, > you might also use ingestion time instead of event time. > > 2018-01-11 11:49 GMT+01:00 Jayant Ameta <wittyam...@gmail.com>: > >> Thanks Gary, >> I was only trying with a fixed set of events, so the Watermark was not >> advancing, like you said. >> >> >> Jayant Ameta >> >> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <g...@data-artisans.com> wrote: >> >>> Hi Jayant, >>> >>> The difference is that the Watermarks from >>> BoundedOutOfOrdernessTimestampExtractor are based on the greatest >>> timestamp of >>> all previous events. That is, if you do not receive new events, the >>> Watermark >>> will not advance. In contrast, your custom implementation of >>> AssignerWithPeriodicWatermarks always advances the Watermark based on >>> the wall >>> clock. >>> >>> Maybe this will already help you to debug your application. If not, it >>> would be >>> great to see a minimal working example. >>> >>> Best, >>> Gary >>> >>> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <wittyam...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is >>>> not firing. However, the trigger fires when using custom timestamp >>>> extractor with similar watermark. >>>> >>>> Sample code below: >>>> 1.Assigner as anonymous class which works fine >>>> >>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new >>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() { >>>> >>>> @Override >>>> public long extractTimestamp(Tuple2<Rule, T> element, long >>>> previousElementTimestamp) { >>>> return System.currentTimeMillis(); >>>> } >>>> >>>> @Override >>>> public final Watermark getCurrentWatermark() { >>>> // this guarantees that the watermark never goes backwards. >>>> return new Watermark(System.currentTimeMillis()-100); >>>> } >>>> }; >>>> >>>> >>>> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work >>>> >>>> AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new >>>> BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, >>>> T>>(Time.milliseconds(100)) { >>>> >>>> @Override >>>> public long extractTimestamp(Tuple2<Rule, T> element) { >>>> return System.currentTimeMillis(); >>>> } >>>> }; >>>> >>>> >>>> Do you see any difference in the approaches? >>>> >>>> - Jayant >>>> >>> >>> >> >