Hi,
I was recently working on a problem where we wanted to implement a
simple count on a sliding window, e.g. "how many messages of a certain
type were emitted by a certain type of sensor in the last n minutes".
Which sounds simple enough in theory:
messageStream
.keyBy(//EmitterType + MessageType)
.assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
Time.seconds(30)))
.map(_ => 1)
.reduce((x,y) => x + y)
.addSink(...)
But there is a tricky edge case: The downstream systems will never know
when the count for a certain key goes back to 0, which is important for
our use case. The technical reason being that flink doesn't open a
window if there are no entries, i.e. a window with count 0 doesn't exist
in flink.
We came up with the following solution for the time being:
messageStream
.keyBy(//EmitterType + MessageType)
.window(GlobalWindows.create())
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
.evictor(// CustomEvictor: Evict all messages older than n minutes
BEFORE processing the window)
.process(// CustomCounter: Count all Messages in Window State);
.addSink(...)
In the case of zero messages in the last n minutes, all messages will be
evicted from the window and the process-function will get triggered one
last time on the now empty window, so we can produce a count of 0.
I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom
process functions will always keep all messages in state. And, on every
trigger all elements will have to be touched twice: To compare the
timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.
So, I was wondering if there was a more efficient or "flink-like"
approach to this. Sorry for the long writeup, but I would love to hear
your takes.
Best regards
Jan
--
neuland – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen
Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de
https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi
Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501