Hi Shay,

I would suggest to try Allowed Lateness, like you mention 500 ms:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#allowed-lateness
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/windows.html#allowed-lateness>
It might also work for processing time.

Cheers,
Andrey

> On 18 Jul 2018, at 17:22, Shimony, Shay <shay.shim...@citi.com> wrote:
> 
> Hi,
>  
> It seems like we encounter a race situation between the aggregation thread 
> and the Time Trigger thread.
> It might not be a bug, but it still seems strange to us, and we would like 
> your help to fix it/work around it please.
>  
> First, few descriptions about our use case and system:
> ·         We are working with processing time.
> ·         We are using Flink 1.4.
> ·         We use our customized sliding window of size 1 minute, slide 10 
> seconds. 
> But we think it can happen also in tumbling window. So for simplicity, let’s 
> assume tumbling window of 1 minute.
> ·         Our window Trigger does FIRE upon each element.
> ·         We have constant 2k/sec incoming messages, balanced rate.
> ·         When I say “window state” I mean simply our aggregation value in it.
>  
> If the timestamp of an element is very close to the end of the window, then 
> it will be assigned with that window of course, but it occasionally happen 
> that this window is timing out and cleared – before this element is 
> aggregated with the window state, thus we lost the previous aggregation value 
> and got new aggregation state with the element value.
>  
> Below is the story as seen by the threads. 
> Timestamps are logical.
>  
> Suppose we are in the beginning of WindowOperator.processElement.
> Current time: 119 (nearly 120)
>  
> Reducer thread
> Time Trigger thread
> Assign element to window [60, 120],
> because context.getCurrentProcessingTime()
> Returned 119 (in assignWindows)
>  
>  
> Time is 120 à clear window state
> Add the element value to window state [60, 120] (it starts from new state)
>  
>  
> Our questions:
> 1.       Is it a legitimate race? (We expected that (1) assigning element to 
> a window + aggregating it to its state, and (2) clearing the window – would 
> be atomic to each other – that is, if an element is valid for a window, then 
> it will be assigned to it and aggregated fully into its state, and only then 
> window clear can happen).
> 2.       How could we make the Time Trigger thread wait a little bit with the 
> window cleaning? Like adding 500ms to clean window time schedule.
> We thought to override WindowOperator.cleanupTime, so is it possible to 
> easily replace WindowOperator with ours?
> 3.       Maybe you have different idea to work around it?
>  
> Thanks!
> Shay

Reply via email to