Hi Shay, I would suggest to try Allowed Lateness, like you mention 500 ms: https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#allowed-lateness <https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#allowed-lateness> It might also work for processing time.
Cheers, Andrey > On 18 Jul 2018, at 17:22, Shimony, Shay <shay.shim...@citi.com> wrote: > > Hi, > > It seems like we encounter a race situation between the aggregation thread > and the Time Trigger thread. > It might not be a bug, but it still seems strange to us, and we would like > your help to fix it/work around it please. > > First, few descriptions about our use case and system: > · We are working with processing time. > · We are using Flink 1.4. > · We use our customized sliding window of size 1 minute, slide 10 > seconds. > But we think it can happen also in tumbling window. So for simplicity, let’s > assume tumbling window of 1 minute. > · Our window Trigger does FIRE upon each element. > · We have constant 2k/sec incoming messages, balanced rate. > · When I say “window state” I mean simply our aggregation value in it. > > If the timestamp of an element is very close to the end of the window, then > it will be assigned with that window of course, but it occasionally happen > that this window is timing out and cleared – before this element is > aggregated with the window state, thus we lost the previous aggregation value > and got new aggregation state with the element value. > > Below is the story as seen by the threads. > Timestamps are logical. > > Suppose we are in the beginning of WindowOperator.processElement. > Current time: 119 (nearly 120) > > Reducer thread > Time Trigger thread > Assign element to window [60, 120], > because context.getCurrentProcessingTime() > Returned 119 (in assignWindows) > > > Time is 120 à clear window state > Add the element value to window state [60, 120] (it starts from new state) > > > Our questions: > 1. Is it a legitimate race? (We expected that (1) assigning element to > a window + aggregating it to its state, and (2) clearing the window – would > be atomic to each other – that is, if an element is valid for a window, then > it will be assigned to it and aggregated fully into its state, and only then > window clear can happen). > 2. How could we make the Time Trigger thread wait a little bit with the > window cleaning? Like adding 500ms to clean window time schedule. > We thought to override WindowOperator.cleanupTime, so is it possible to > easily replace WindowOperator with ours? > 3. Maybe you have different idea to work around it? > > Thanks! > Shay