Hi Aljoscha,
I'm in favor of option 2: Keep the setStreamTimeCharacteristic to set
the default time behavior. Then add a method to the operators to set a
custom time behavior.
The problem explanatory in SlidingTimeWindows:
@Override
public Trigger<Object, TimeWindow>
getDefaultTrigger(StreamExecutionEnvironment env) {
if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return ProcessingTimeTrigger.create();
} else {
return EventTimeTrigger.create();
}
}
That just needs to be fixed, e.g. by having a dedicated
setTimeCharacteristic(..) on the operator.
+1 for removing AbstractTime, EvenTime, and ProcessingTime.
Cheers,
Max
On Wed, Dec 16, 2015 at 3:26 PM, Aljoscha Krettek <[email protected]> wrote:
> Hi,
> I thought a bit about how to improve the handling of time in Flink, mostly as
> it relates to windows. The problem is that mixing processing-time and
> event-time windows in one topology is very hard (impossible) right now. Let
> my explain it with this example:
>
> val env: StreamExecutionEnvironment = …
>
> env.setStreamTimeCharacteristic(EventTime)
>
> val input = <some stream>
>
> val quickResults = input
> .keyBy(…)
> .window(TumblingTimeWindows.of(Time.seconds(5))
> .trigger(ProcessingTimeTrigger.create())
> .sum(1)
>
> val slowResults = input
> .keyBy(…)
> .window(TumblingTimeWindows.of(Time.seconds(5))
> // .trigger(EventTimeTrigger.create()) this is the default trigger, so no
> need to set it, really
> .sum(1)
>
> The idea is that you want to have fast, but possibly inaccurate, results
> using processing time and correct, but maybe slower, results using event-time
> windowing.
>
> The problem is that the current API tries to solve two problems:
> 1. We want to have a way to just say “time window” and then let the system
> instantiate the correct window-operator based on the time characteristic
> 2. We want to have flexibility to allow users to mix ’n match
> processing-time and event-time windows
>
> The above example does not work because both operators would assign elements
> to windows based on the event-time timestamp. The first window therefore
> triggers event-time windows by processing time, which has unexpected (wrong)
> results.
>
> I see three solutions to this:
> 1. Remove setStreamTimeCharacteristic(), let users always specify exactly
> what kind of windowing they want
> 2. Keep setStreamTimeCharacteristic() but only employ the magic that decides
> on the window operator for the special .timeWindow() call. Have two different
> window assigners (two per window type, that is TumblingWindows,
> SlidingTimeWindows, SessionWindows, ...), one for processing-time and one for
> event-time that allow users to accurately specify what they want
> 3. Keep setStreamTimeCharacteristic() and have three window assigners per
> window type, one for processing-time, one for event-time and one that
> automatically decides based on the time characteristic
>
> What do you think?
>
> On a side note, I would also suggest to remove AbstractTime, EventTime, and
> ProcessingTime and just keep Time for specifying time.
>
> Cheers,
> Aljoscha