I have to extend my answer: The behavior allowedLateness that I described applies only if the window trigger calls FIRE when the window is evaluated (this is the default behavior of most triggers).
In case the trigger calls FIRE_AND_PURGE, the state of the window is purged when the function is evaluated and late events are processed alone, i.e., in my example <12:09, G> would be processed without [A, B, C, D]. When the allowed lateness is passed, all window state is purged regardless of the trigger. Best, Fabian 2016-10-17 16:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Yassine, > > the difference is the following: > > 1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp > extractor and watermark assigner. > A timestamp extractor tells Flink when an event happened, i.e., it > extracts a timestamp from the event. A watermark assigner tells Flink what > the current logical time is. > The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink > asks what the current time is, it returns the latest observed timestamp > minus the a configurable bound. This is the safety margin for late data. > A record whose timestamp is lower than the last watermark is considered > to be late. > > 2) The allowedLateness parameter of time windows tells Flink how long to > keep state around after the window was evaluated. > If data arrives after the evaluation and before the allowedLateness has > passed, the window function is applied again and an update is sent out. > > Let's look at an example. > Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling > window that starts at 12:00 and ends at 12:10: > > If you have the following data: > > 12:01, A > 12:04, B > WM, 12:02 // 12:04 - 2 minutes > 12:02, C > 12:08, D > 12:14, E > WM, 12:12 > 12:16, F > WM, 12:14 // 12:16 - 2 minutes > 12:09, G > > == no allowed lateness > The window operator forwards the logical time to 12:12 when it receives > <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this > time and finally purges its state. <12:09, G> is later ignored. > > == allowed lateness of 3 minutes > The window operator evaluates the window when <WM, 12:12> is received, but > its state is not purged yet. The state is purged when <WM, 12:14> is > received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is > again ignored. > > == allowed lateness of 5 minutes > The window operator evaluates the window when <WM, 12:12> is received, but > its state is not purged yet. When <12:09, G> is received, the window is > again evaluated but this time with [A, B, C, D, G] and an update is sent > out. The state is purged when a watermark of >=12:15 is received. > > So, watermarks tell the Flink what time it is and allowed lateness tells > the system when state should be discarded and all later arriving data be > ignored. > These issue are related but not exactly the same thing. For instance you > can counter late data by increasing the bound or the lateness parameter. > Increasing the watermark bound will yield higher latencies as windows are > evaluated later. > Configuring allowedLateness will allow for earlier results, but you have > to cope with the updates downstream. > > Please let me know, if you have questions. > > Best, Fabian > > > > > > > > > 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > >> Hi, >> >> I'm a bit confused about how Flink deals with late elements after the >> introduction of allowedlateness to windows. What is the difference between >> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and >> allowedlateness(Time.seconds(X))? What if one is used and the other is >> not? and what if a different lateness is used in each one? Could you please >> clarify it on basis of a simple example? Thank you. >> >> Best, >> Yassine >> > >