I have the following use case:

Input stream of timestamped "on" and "off" events received out of order.
I need to produce an event with time that system was "on" every 15 minutes.
Events should be produced only for intervals that system was "on".

When 15 minute window has at least one record it is triggered and the
required aggregate is created, but when no event is received within 15
minute period window is not triggered and nothing is produced.

I understand that it is not feasible to trigger on empty windows when the
set of keys is unbounded. But it would be nice to give the control for such
triggering to a window function. In my case the window function could
enable the empty triggering for the current key when the last event in the
evaluated window is "on" and disable it if is "off".
The strawman API for such feature:

public void apply(String key, TimeWindow window, Iterable<OnOffEvent>
input, Collector<Aggregate> out) throws Exception {

    ...

    RuntimeContext context = this.getRuntimeContext();

    if (lastEvent.isOn()) {

       context.enableEmptyWindowTriggering();

    } else {

       context.disableEmptyWindowTriggering();

    }

}

I could implement the same logic using global window and custom trigger and
evictor, but it looks like ugly workaround to me.

Is there any better way to solve this use case?

Thanks,

Maxim.

Reply via email to