Hi all,

Based on the advice of Aljoscha in this ’m trying to implement a 
ProcessFunction that simulates the original sliding window (using Flink 1.2.1, 
still). 
My current setup is as follows for a window that is windowWidth wide and slides 
every windowSlide:

- Keep a ListState<Tuple2<Long, InputType>>
- processElement adds the incoming element together with context.timestamp() to 
the list state
- If no time was started (a flag in the function instance), 
registerEventTimeTimer(context.timestamp() + windowWidth); set flag to true:
       if (!timerStarted) {
                   ctx.timerService().registerEventTimeTimer(timestamp + 
windowWidthMs); // timestamp = context.timestamp()
                   timerStarted = true;
                        }
- onTimer gets the list, restricts it to all elements that are currentTime – 
windowWidth in the past, and passes them on to a wrapped function that does my 
‘domain logic’. The result of that function is passed on to the collector. 
After that register a new timer at currentTime + windowSlide.

I’m now testing this function using a LocalEnvironment and a stream 
(fromCollection) of increasing longs, with the timestamp being the value of the 
long. The toplogy is: 
streamOfLongs
  .map(l -> Long.toString(l)) // Needed because myProcessFunction operates on 
Strings. It also triggers parallel operation.
  .keyBy(x -> 0)
  .process(myProcessFunction)

What I’m now running into is that the first element passed to myProcessFunction 
is not always the first element in the stream (unless I set the parallelism of 
the operators to 1), but I need the timestamp of the first element to start off 
my timer chain. 
Is there a way around this? The only solution right now is changing the onTimer 
call to extract all windows (from the beginning) of the state and then clean up 
the ListState to remove elements that are no longer part of any future window 
(based on the watermark). However, this feels a bit clunky, and might still 
lead to duplicate ‘windows’ without some extra bookkeeping. 

BTW, it seems that the example of ProcessingFunction in the docks (of Flink 
1.3.0) also has this problem, since it sets the state to refer to the last 
processed element,  but if elements are processed in parallel, they do not 
arrive in order, so the last processed element might not be the most recent 
element.


Thanks,
Carst



Reply via email to