Hi Fabian, Thank you very much for the great answer and example, I appreciate it! It is all clear now.
Best, Yassine 2016-10-17 16:29 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > I have to extend my answer: > > The behavior allowedLateness that I described applies only if the window > trigger calls FIRE when the window is evaluated (this is the default > behavior of most triggers). > > In case the trigger calls FIRE_AND_PURGE, the state of the window is > purged when the function is evaluated and late events are processed alone, > i.e., in my example <12:09, G> would be processed without [A, B, C, D]. > When the allowed lateness is passed, all window state is purged regardless > of the trigger. > > Best, Fabian > > 2016-10-17 16:24 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Yassine, >> >> the difference is the following: >> >> 1) The BoundedOutOfOrdernessTimestampExtractor is a built-in timestamp >> extractor and watermark assigner. >> A timestamp extractor tells Flink when an event happened, i.e., it >> extracts a timestamp from the event. A watermark assigner tells Flink what >> the current logical time is. >> The BoundedOutOfOrdernessTimestampExtractor works as follows: When Flink >> asks what the current time is, it returns the latest observed timestamp >> minus the a configurable bound. This is the safety margin for late data. >> A record whose timestamp is lower than the last watermark is considered >> to be late. >> >> 2) The allowedLateness parameter of time windows tells Flink how long to >> keep state around after the window was evaluated. >> If data arrives after the evaluation and before the allowedLateness has >> passed, the window function is applied again and an update is sent out. >> >> Let's look at an example. >> Assume you have a BOOTE with a 2 minute bound and a 10 minute tumbling >> window that starts at 12:00 and ends at 12:10: >> >> If you have the following data: >> >> 12:01, A >> 12:04, B >> WM, 12:02 // 12:04 - 2 minutes >> 12:02, C >> 12:08, D >> 12:14, E >> WM, 12:12 >> 12:16, F >> WM, 12:14 // 12:16 - 2 minutes >> 12:09, G >> >> == no allowed lateness >> The window operator forwards the logical time to 12:12 when it receives >> <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this >> time and finally purges its state. <12:09, G> is later ignored. >> >> == allowed lateness of 3 minutes >> The window operator evaluates the window when <WM, 12:12> is received, >> but its state is not purged yet. The state is purged when <WM, 12:14> is >> received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is >> again ignored. >> >> == allowed lateness of 5 minutes >> The window operator evaluates the window when <WM, 12:12> is received, >> but its state is not purged yet. When <12:09, G> is received, the window is >> again evaluated but this time with [A, B, C, D, G] and an update is sent >> out. The state is purged when a watermark of >=12:15 is received. >> >> So, watermarks tell the Flink what time it is and allowed lateness tells >> the system when state should be discarded and all later arriving data be >> ignored. >> These issue are related but not exactly the same thing. For instance you >> can counter late data by increasing the bound or the lateness parameter. >> Increasing the watermark bound will yield higher latencies as windows are >> evaluated later. >> Configuring allowedLateness will allow for earlier results, but you have >> to cope with the updates downstream. >> >> Please let me know, if you have questions. >> >> Best, Fabian >> >> >> >> >> >> >> >> >> 2016-10-17 11:52 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: >> >>> Hi, >>> >>> I'm a bit confused about how Flink deals with late elements after the >>> introduction of allowedlateness to windows. What is the difference between >>> using a BoundedOutOfOrdernessTimestampExtractor(Time.seconds(X)) and >>> allowedlateness(Time.seconds(X))? What if one is used and the other is >>> not? and what if a different lateness is used in each one? Could you please >>> clarify it on basis of a simple example? Thank you. >>> >>> Best, >>> Yassine >>> >> >> >