Hi Yassine, the difference is the following:
1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp extractor and watermark assigner. A timestamp extractor tells Flink when an event happened, i.e., it extracts a timestamp from the event. A watermark assigner tells Flink what the current logical time is. The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink asks what the current time is, it returns the latest observed timestamp minus the a configurable bound. This is the safety margin for late data. A record whose timestamp is lower than the last watermark is considered to be late. 2) The allowedLateness parameter of time windows tells Flink how long to keep state around after the window was evaluated. If data arrives after the evaluation and before the allowedLateness has passed, the window function is applied again and an update is sent out. Let's look at an example. Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling window that starts at 12:00 and ends at 12:10: If you have the following data: 12:01, A 12:04, B WM, 12:02 // 12:04 - 2 minutes 12:02, C 12:08, D 12:14, E WM, 12:12 12:16, F WM, 12:14 // 12:16 - 2 minutes 12:09, G == no allowed lateness The window operator forwards the logical time to 12:12 when it receives <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this time and finally purges its state. <12:09, G> is later ignored. == allowed lateness of 3 minutes The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. The state is purged when <WM, 12:14> is received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is again ignored. == allowed lateness of 5 minutes The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. When <12:09, G> is received, the window is again evaluated but this time with [A, B, C, D, G] and an update is sent out. The state is purged when a watermark of >=12:15 is received. So, watermarks tell the Flink what time it is and allowed lateness tells the system when state should be discarded and all later arriving data be ignored. These issue are related but not exactly the same thing. For instance you can counter late data by increasing the bound or the lateness parameter. Increasing the watermark bound will yield higher latencies as windows are evaluated later. Configuring allowedLateness will allow for earlier results, but you have to cope with the updates downstream. Please let me know, if you have questions. Best, Fabian 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > Hi, > > I'm a bit confused about how Flink deals with late elements after the > introduction of allowedlateness to windows. What is the difference between > using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and > allowedlateness(Time.seconds(X))? What if one is used and the other is > not? and what if a different lateness is used in each one? Could you please > clarify it on basis of a simple example? Thank you. > > Best, > Yassine >