Hi,

It seems like we encounter a race situation between the aggregation thread and 
the Time Trigger thread.
It might not be a bug, but it still seems strange to us, and we would like your 
help to fix it/work around it please.

First, few descriptions about our use case and system:

·         We are working with processing time.

·         We are using Flink 1.4.

·         We use our customized sliding window of size 1 minute, slide 10 
seconds.
But we think it can happen also in tumbling window. So for simplicity, let's 
assume tumbling window of 1 minute.

·         Our window Trigger does FIRE upon each element.

·         We have constant 2k/sec incoming messages, balanced rate.

·         When I say "window state" I mean simply our aggregation value in it.

If the timestamp of an element is very close to the end of the window, then it 
will be assigned with that window of course, but it occasionally happen that 
this window is timing out and cleared - before this element is aggregated with 
the window state, thus we lost the previous aggregation value and got new 
aggregation state with the element value.

Below is the story as seen by the threads.
Timestamps are logical.

Suppose we are in the beginning of WindowOperator.processElement.
Current time: 119 (nearly 120)

Reducer thread

Time Trigger thread

Assign element to window [60, 120],

because context.getCurrentProcessingTime()

Returned 119 (in assignWindows)





Time is 120 --> clear window state

Add the element value to window state [60, 120] (it starts from new state)




Our questions:

1.       Is it a legitimate race? (We expected that (1) assigning element to a 
window + aggregating it to its state, and (2) clearing the window - would be 
atomic to each other - that is, if an element is valid for a window, then it 
will be assigned to it and aggregated fully into its state, and only then 
window clear can happen).

2.       How could we make the Time Trigger thread wait a little bit with the 
window cleaning? Like adding 500ms to clean window time schedule.
We thought to override WindowOperator.cleanupTime, so is it possible to easily 
replace WindowOperator with ours?

3.       Maybe you have different idea to work around it?

Thanks!
Shay

Reply via email to