Hi Yun,
thanks for your reply.
I do agree with your point about standard windows being for high level
operations and the lower-level apis offering a rich toolset for most
advanced use cases.
I have tried to solve my problem with keyedProcessFunctions also but was
not able to get it to work for two reasons:
1) I was not able to set up a combination of ValueState, Timers and
Triggers that emulated a sliding window with a rising and falling count
(including 0) good enough.
2) Memory Leak: States / Windows should be cleared after a certain time
of being at count 0 in order to prevent an infinitely rising of
ValueStates (that are not needed anymore)
Can you maybe please elaborate in pseudocode how you would envision your
solution?
Best regards
Jan
On 08.02.21 05:31, Yun Gao wrote:
Hi Jan,
From my view, I think in Flink Window should be as a "high-level"
operation for some kind
of aggregation operation and if it could not satisfy the requirements,
we could at least turn to
using the "low-level" api by using KeyedProcessFunction[1].
In this case, we could use a ValueState to store the current value for
each key, and increment
the value on each element. Then we could also register time for each
key on receiving the first
element for this key, and in the onTimer callback, we could send the
current state value, update
the value to 0 and register another timer for this key after 30s.
Best,
Yun
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
------------------Original Mail ------------------
*Sender:*Jan Brusch <jan.bru...@neuland-bfi.de>
*Send Date:*Sat Feb 6 23:44:00 2021
*Recipients:*user <user@flink.apache.org>
*Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
Hi,
I was recently working on a problem where we wanted to implement a
simple count on a sliding window, e.g. "how many messages of a certain
type were emitted by a certain type of sensor in the last n minutes".
Which sounds simple enough in theory:
messageStream
.keyBy(//EmitterType + MessageType)
.assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
Time.seconds(30)))
.map(_ => 1)
.reduce((x,y) => x + y)
.addSink(...)
But there is a tricky edge case: The downstream systems will never know
when the count for a certain key goes back to 0, which is important for
our use case. The technical reason being that flink doesn't open a
window if there are no entries, i.e. a window with count 0 doesn't exist
in flink.
We came up with the following solution for the time being:
messageStream
.keyBy(//EmitterType + MessageType)
.window(GlobalWindows.create())
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
.evictor(// CustomEvictor: Evict all messages older than n minutes
BEFORE processing the window)
.process(// CustomCounter: Count all Messages in Window State);
.addSink(...)
In the case of zero messages in the last n minutes, all messages will be
evicted from the window and the process-function will get triggered one
last time on the now empty window, so we can produce a count of 0.
I have two problems, though, with this solution:
1) It is computationally inefficient for a simple count, as custom
process functions will always keep all messages in state. And, on every
trigger all elements will have to be touched twice: To compare the
timestamp and to count.
2) It does seem like a very roundabout solution to a simple problem.
So, I was wondering if there was a more efficient or "flink-like"
approach to this. Sorry for the long writeup, but I would love to hear
your takes.
Best regards
Jan
--
neuland – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen
Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de
https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi
Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
--
neuland – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen
Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de
https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi
Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501