Hi,
I thought a bit about how to improve the handling of time in Flink, mostly as 
it relates to windows. The problem is that mixing processing-time and 
event-time windows in one topology is very hard (impossible) right now. Let my 
explain it with this example:

val env: StreamExecutionEnvironment = …

env.setStreamTimeCharacteristic(EventTime)

val input = <some stream>

val quickResults = input
  .keyBy(…)
  .window(TumblingTimeWindows.of(Time.seconds(5))
  .trigger(ProcessingTimeTrigger.create())
  .sum(1)

val slowResults = input
  .keyBy(…)
  .window(TumblingTimeWindows.of(Time.seconds(5))
  // .trigger(EventTimeTrigger.create()) this is the default trigger, so no 
need to set it, really
  .sum(1)

The idea is that you want to have fast, but possibly inaccurate, results using 
processing time and correct, but maybe slower, results using event-time 
windowing.

The problem is that the current API tries to solve two problems:
 1. We want to have a way to just say “time window” and then let the system 
instantiate the correct window-operator based on the time characteristic
 2. We want to have flexibility to allow users to mix ’n match processing-time 
and event-time windows

The above example does not work because both operators would assign elements to 
windows based on the event-time timestamp. The first window therefore triggers 
event-time windows by processing time, which has unexpected (wrong) results.

I see three solutions to this:
 1. Remove setStreamTimeCharacteristic(), let users always specify exactly what 
kind of windowing they want
 2. Keep setStreamTimeCharacteristic() but only employ the magic that decides 
on the window operator for the special .timeWindow() call. Have two different 
window assigners (two per window type, that is TumblingWindows, 
SlidingTimeWindows, SessionWindows, ...), one for processing-time and one for 
event-time that allow users to accurately specify what they want
 3. Keep setStreamTimeCharacteristic() and have three window assigners per 
window type, one for processing-time, one for event-time and one that 
automatically decides based on the time characteristic

What do you think?

On a side note, I would also suggest to remove AbstractTime, EventTime, and 
ProcessingTime and just keep Time for specifying time.

Cheers,
Aljoscha

Reply via email to