A 10 minute tumbling window that starts at 12:00 is evaluated after a watermark is observed that is > 12:10. If the same tumbling window has an allowed lateness of 5 minuted, it is evaluated once a watermark > 12:15 is observed. However, only elements with timestamps 12:00 <= x < 12:10 are in the window. Elements that arrive even after the allowed lateness period are simply dropped.
Best, Fabian 2016-09-01 20:42 GMT+02:00 Paul Joireman <paul.joire...@physiq.com>: > Thanks Fabian, > > > This is making more sense. Is allowedLateness(Time.seconds(x)) then > evaluated relative to maxEventTime - lastWaterMarkTime. So if (maxEventTime > - lastWaterMarkTime) > x * 1000 then the window is evaluated? > > > Paul > ------------------------------ > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Thursday, September 1, 2016 1:25:55 PM > *To:* user@flink.apache.org > *Subject:* Re: Windows and Watermarks Clarification > > Hi Paul, > > BoundedOutOfOrdernessTimestampExtractor implements the > AssignerWithPeriodicWatermarks interface. > This means, Flink will ask the assigner in regular intervals (configurable > via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) > for the current watermark. > The watermark will be 10secs earlier than the highest observed timestamp > so far. > > An event-time window is evaluated when the current watermark is higher / > later than the window's end time. With allowedLateness() the window > evaluation can be deferred to allow late elements (elements whose timestamp > is before the current watermark) to join the window before it is evaluated. > > Let me know if you have further questions, > Fabian > > > 2016-09-01 20:16 GMT+02:00 Paul Joireman <paul.joire...@physiq.com>: > >> Hi all, >> >> >> Just a point of clarification on how watermarks are generated. I'd like >> to use a SlidingEventTime window of say 5 minutes with a 30 second slide. >> The incoming data stream has elements from which I can extract the >> timestamp but they may come out of order so I chose to implement the >> following timestamp assigner. >> >> >> my_stream.assignTimestampsAndWatermarks( >> new >> BoundedOutOfOrdernessTimestampExtractor<MyElement>(Time.seconds(10)) >> { >> @Override >> public long extractTimestamp(final MyElement element) { >> return element.getTimestamp(); >> } >> }); >> >> With this definition and the code for >> BoundedOutOfOrdernessTimestampExtractor, >> my understanding is that for each incoming element a watermark will be >> generated that is 10 seconds behind the current timestamp. If any the >> end time of any of the sliding windows is earlier that an emitted watermark >> that (or those) windows will fire initiating a processing on the window(s). >> Is this correct? >> >> Paul >> >> >