Hi,
I was recently working on a problem where we wanted to implement a simple count on a sliding window, e.g. "how many messages of a certain type were emitted by a certain type of sensor in the last n minutes". Which sounds simple enough in theory:

messageStream
    .keyBy(//EmitterType + MessageType)
    .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), Time.seconds(30)))
    .map(_ => 1)
    .reduce((x,y) => x + y)
    .addSink(...)

But there is a tricky edge case: The downstream systems will never know when the count for a certain key goes back to 0, which is important for our use case. The technical reason being that flink doesn't open a window if there are no entries, i.e. a window with count 0 doesn't exist in flink.

We came up with the following solution for the time being:

messageStream
    .keyBy(//EmitterType + MessageType)
    .window(GlobalWindows.create())
    .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
    .evictor(// CustomEvictor: Evict all messages older than n minutes BEFORE processing the window)
    .process(// CustomCounter: Count all Messages in Window State);
    .addSink(...)

In the case of zero messages in the last n minutes, all messages will be evicted from the window and the process-function will get triggered one last time on the now empty window, so we can produce a count of 0.

I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom process functions will always keep all messages in state. And, on every trigger all elements will have to be touched twice: To compare the timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.

So, I was wondering if there was a more efficient or "flink-like" approach to this. Sorry for the long writeup, but I would love to hear your takes.


Best regards
Jan

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply via email to