I have the following use case: Input stream of timestamped "on" and "off" events received out of order. I need to produce an event with time that system was "on" every 15 minutes. Events should be produced only for intervals that system was "on".
When 15 minute window has at least one record it is triggered and the required aggregate is created, but when no event is received within 15 minute period window is not triggered and nothing is produced. I understand that it is not feasible to trigger on empty windows when the set of keys is unbounded. But it would be nice to give the control for such triggering to a window function. In my case the window function could enable the empty triggering for the current key when the last event in the evaluated window is "on" and disable it if is "off". The strawman API for such feature: public void apply(String key, TimeWindow window, Iterable<OnOffEvent> input, Collector<Aggregate> out) throws Exception { ... RuntimeContext context = this.getRuntimeContext(); if (lastEvent.isOn()) { context.enableEmptyWindowTriggering(); } else { context.disableEmptyWindowTriggering(); } } I could implement the same logic using global window and custom trigger and evictor, but it looks like ugly workaround to me. Is there any better way to solve this use case? Thanks, Maxim.