Hi, I also think there should be different ways to achieve the target. For the first option listed previously, the pseudo-code roughly like
class MyFunciton extends KeyedProcessFunction { ValueState<Integer> count; void open() { count = ... // Create the value state } void processElement(T t, Context context, Collector collector) { Integer current = count.get(); if (current == null) { context.timeService().registerTimer(30); // Register timer for the first time current = 0; } count.update(current + 1); // update the count } void onTimer(...) { collector.collect(new Tuple2<>(getCurrentKey(), count.get()); context.timeService().registerTimer(30); // register the following timer } } 1. For flink the state and timer are all bound to a key implicitly, thus I think they should not need to be bound manually. 2. To clear the outdated state, it could be cleared via count.clear(); if it has been 0 for a long time. There are different ways to count the interval, like register another timer and clear the timer when received the elements or update the counter to -1, -2... to mark how much timer it has passed. Best, Yun ------------------Original Mail ------------------ Sender:Khachatryan Roman <khachatryan.ro...@gmail.com> Send Date:Tue Feb 9 02:35:20 2021 Recipients:Jan Brusch <jan.bru...@neuland-bfi.de> CC:Yun Gao <yungao...@aliyun.com>, user <user@flink.apache.org> Subject:Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem Hi, Probably another solution would be to register a timer (using KeyedProcessFunction) once we see an element after keyBy. The timer will fire in windowIntervalMs. Upon firing, it will emit a dummy element which will be ignored (or subtracted) in the end. Upon receiving each new element, the function will shift the timer accordingly. Regards, Roman On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch <jan.bru...@neuland-bfi.de> wrote: Hi Yun, thanks for your reply. I do agree with your point about standard windows being for high level operations and the lower-level apis offering a rich toolset for most advanced use cases. I have tried to solve my problem with keyedProcessFunctions also but was not able to get it to work for two reasons: 1) I was not able to set up a combination of ValueState, Timers and Triggers that emulated a sliding window with a rising and falling count (including 0) good enough. 2) Memory Leak: States / Windows should be cleared after a certain time of being at count 0 in order to prevent an infinitely rising of ValueStates (that are not needed anymore) Can you maybe please elaborate in pseudocode how you would envision your solution? Best regards Jan On 08.02.21 05:31, Yun Gao wrote: Hi Jan, From my view, I think in Flink Window should be as a "high-level" operation for some kind of aggregation operation and if it could not satisfy the requirements, we could at least turn to using the "low-level" api by using KeyedProcessFunction[1]. In this case, we could use a ValueState to store the current value for each key, and increment the value on each element. Then we could also register time for each key on receiving the first element for this key, and in the onTimer callback, we could send the current state value, update the value to 0 and register another timer for this key after 30s. Best, Yun [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction ------------------Original Mail ------------------ Sender:Jan Brusch <jan.bru...@neuland-bfi.de> Send Date:Sat Feb 6 23:44:00 2021 Recipients:user <user@flink.apache.org> Subject:Sliding Window Count: Tricky Edge Case / Count Zero Problem Hi, I was recently working on a problem where we wanted to implement a simple count on a sliding window, e.g. "how many messages of a certain type were emitted by a certain type of sensor in the last n minutes". Which sounds simple enough in theory: messageStream .keyBy(//EmitterType + MessageType) .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), Time.seconds(30))) .map(_ => 1) .reduce((x,y) => x + y) .addSink(...) But there is a tricky edge case: The downstream systems will never know when the count for a certain key goes back to 0, which is important for our use case. The technical reason being that flink doesn't open a window if there are no entries, i.e. a window with count 0 doesn't exist in flink. We came up with the following solution for the time being: messageStream .keyBy(//EmitterType + MessageType) .window(GlobalWindows.create()) .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30))) .evictor(// CustomEvictor: Evict all messages older than n minutes BEFORE processing the window) .process(// CustomCounter: Count all Messages in Window State); .addSink(...) In the case of zero messages in the last n minutes, all messages will be evicted from the window and the process-function will get triggered one last time on the now empty window, so we can produce a count of 0. I have two problems, though, with this solution: 1) It is computationally inefficient for a simple count, as custom process functions will always keep all messages in state. And, on every trigger all elements will have to be touched twice: To compare the timestamp and to count. 2) It does seem like a very roundabout solution to a simple problem. So, I was wondering if there was a more efficient or "flink-like" approach to this. Sorry for the long writeup, but I would love to hear your takes. Best regards Jan -- neuland – Büro für Informatik GmbH Konsul-Smidt-Str. 8g, 28217 Bremen Telefon (0421) 380107 57 Fax (0421) 380107 99 https://www.neuland-bfi.de https://twitter.com/neuland https://facebook.com/neulandbfi https://xing.com/company/neulandbfi Geschäftsführer: Thomas Gebauer, Jan Zander Registergericht: Amtsgericht Bremen, HRB 23395 HB USt-ID. DE 246585501 -- neuland – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon (0421) 380107 57Fax (0421) 380107 99https://www.neuland-bfi.dehttps://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfiGeschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht: Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501