Hi, It seems like we encounter a race situation between the aggregation thread and the Time Trigger thread. It might not be a bug, but it still seems strange to us, and we would like your help to fix it/work around it please.
First, few descriptions about our use case and system: · We are working with processing time. · We are using Flink 1.4. · We use our customized sliding window of size 1 minute, slide 10 seconds. But we think it can happen also in tumbling window. So for simplicity, let's assume tumbling window of 1 minute. · Our window Trigger does FIRE upon each element. · We have constant 2k/sec incoming messages, balanced rate. · When I say "window state" I mean simply our aggregation value in it. If the timestamp of an element is very close to the end of the window, then it will be assigned with that window of course, but it occasionally happen that this window is timing out and cleared - before this element is aggregated with the window state, thus we lost the previous aggregation value and got new aggregation state with the element value. Below is the story as seen by the threads. Timestamps are logical. Suppose we are in the beginning of WindowOperator.processElement. Current time: 119 (nearly 120) Reducer thread Time Trigger thread Assign element to window [60, 120], because context.getCurrentProcessingTime() Returned 119 (in assignWindows) Time is 120 --> clear window state Add the element value to window state [60, 120] (it starts from new state) Our questions: 1. Is it a legitimate race? (We expected that (1) assigning element to a window + aggregating it to its state, and (2) clearing the window - would be atomic to each other - that is, if an element is valid for a window, then it will be assigned to it and aggregated fully into its state, and only then window clear can happen). 2. How could we make the Time Trigger thread wait a little bit with the window cleaning? Like adding 500ms to clean window time schedule. We thought to override WindowOperator.cleanupTime, so is it possible to easily replace WindowOperator with ours? 3. Maybe you have different idea to work around it? Thanks! Shay