Hi Jan, Thanks for sharing your solution. You probably also want to remove previously created timer(s) in processElement; so that you don't end up with a timer per element. For that, you can store the previous time (in function state).
Regards, Roman On Fri, Feb 26, 2021 at 10:29 PM Jan Brusch <jan.bru...@neuland-bfi.de> wrote: > Hi everybody, > > I just wanted to say thanks again for all your input and share the > (surprisingly simple) solution that we came up with in the meantime: > > class SensorRecordCounter extends KeyedProcessFunction<String, > SensorRecord, SensorCount>{ > > private ValueState<SensorCount> state; > private long windowSizeMs = 60000L; > > @Override > public void open(Configuration parameters) throws Exception { > state = getRuntimeContext().getState(new > ValueStateDescriptor<>("sensorCount", SensorCount.class)); > } > > > @Override > public void processElement(SensorRecord sensorRecord, Context ctx, > Collector<SensorCount> out) throws Exception { > SensorCount count = state.value(); > if (count == null) { > count = new SensorCount(); > count.setSensorID(sensorRecord.getSensorID()); > count.setCount(0); > } > count.increase(); > state.update(count); > out.collect(count); > > ctx.timerService().registerEventTimeTimer(ctx.timestamp() + > windowSizeMs); > } > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<SensorCount> out) throws Exception { > SensorCount count = state.value(); > count.decrease(); > state.update(count); > out.collect(count); > > if (count.getCount() <= 0) { > state.clear(); > } > } > > } > > > Best regards and a nice weekend > > Jan > > > On 09.02.21 08:28, Arvid Heise wrote: > > Hi Jan, > > Another solution is to insert Heartbeat-events at the source for each > sensor. The solution is very similar to how to advance watermarks when > there are no elements in the respective source partition. > > However, it's only easy to implement if you have your own source and know > all sensors on application start. It might also be possible to implement if > you use a new Source interface. > > On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yungao...@aliyun.com> wrote: > >> >> Hi, >> >> I also think there should be different ways to achieve the target. For >> the first option listed previously, >> the pseudo-code roughly like >> >> class MyFunciton extends KeyedProcessFunction { >> ValueState<Integer> count; >> >> void open() { >> count = ... // Create the value state >> } >> >> void processElement(T t, Context context, Collector collector) { >> Integer current = count.get(); >> if (current == null) { >> context.timeService().registerTimer(30); // >> Register timer for the first time >> current = 0; >> } >> >> count.update(current + 1); // update the count >> } >> >> void onTimer(...) { >> collector.collect(new Tuple2<>(getCurrentKey(), count.get()); >> context.timeService().registerTimer(30); // register the >> following timer >> } >> } >> >> 1. For flink the state and timer are all bound to a key implicitly, thus >> I think they should >> not need to be bound manually. >> 2. To clear the outdated state, it could be cleared via count.clear(); if >> it has been 0 >> for a long time. There are different ways to count the interval, like >> register another timer >> and clear the timer when received the elements or update the counter to >> -1, -2... to mark >> how much timer it has passed. >> >> >> Best, >> Yun >> >> >> >> >> ------------------Original Mail ------------------ >> *Sender:*Khachatryan Roman <khachatryan.ro...@gmail.com> >> *Send Date:*Tue Feb 9 02:35:20 2021 >> *Recipients:*Jan Brusch <jan.bru...@neuland-bfi.de> >> *CC:*Yun Gao <yungao...@aliyun.com>, user <user@flink.apache.org> >> *Subject:*Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem >> >>> Hi, >>> >>> Probably another solution would be to register a timer >>> (using KeyedProcessFunction) once we see an element after keyBy. The timer >>> will fire in windowIntervalMs. Upon firing, it will emit a dummy element >>> which will be ignored (or subtracted) in the end. >>> Upon receiving each new element, the function will shift the timer >>> accordingly. >>> >>> Regards, >>> Roman >>> >>> >>> On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch <jan.bru...@neuland-bfi.de> >>> wrote: >>> >>>> Hi Yun, >>>> >>>> thanks for your reply. >>>> >>>> I do agree with your point about standard windows being for high level >>>> operations and the lower-level apis offering a rich toolset for most >>>> advanced use cases. >>>> >>>> I have tried to solve my problem with keyedProcessFunctions also but >>>> was not able to get it to work for two reasons: >>>> >>>> 1) I was not able to set up a combination of ValueState, Timers and >>>> Triggers that emulated a sliding window with a rising and falling count >>>> (including 0) good enough. >>>> >>>> 2) Memory Leak: States / Windows should be cleared after a certain time >>>> of being at count 0 in order to prevent an infinitely rising of ValueStates >>>> (that are not needed anymore) >>>> >>>> >>>> Can you maybe please elaborate in pseudocode how you would envision >>>> your solution? >>>> >>>> >>>> Best regards >>>> >>>> Jan >>>> On 08.02.21 05:31, Yun Gao wrote: >>>> >>>> Hi Jan, >>>> >>>> From my view, I think in Flink Window should be as a "high-level" >>>> operation for some kind >>>> of aggregation operation and if it could not satisfy the requirements, >>>> we could at least turn to >>>> using the "low-level" api by using KeyedProcessFunction[1]. >>>> >>>> In this case, we could use a ValueState to store the current value for >>>> each key, and increment >>>> the value on each element. Then we could also register time for each >>>> key on receiving the first >>>> element for this key, and in the onTimer callback, we could send the >>>> current state value, update >>>> the value to 0 and register another timer for this key after 30s. >>>> >>>> Best, >>>> Yun >>>> >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction >>>> >>>> ------------------Original Mail ------------------ >>>> *Sender:*Jan Brusch <jan.bru...@neuland-bfi.de> >>>> <jan.bru...@neuland-bfi.de> >>>> *Send Date:*Sat Feb 6 23:44:00 2021 >>>> *Recipients:*user <user@flink.apache.org> <user@flink.apache.org> >>>> *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem >>>> >>>>> Hi, >>>>> I was recently working on a problem where we wanted to implement a >>>>> simple count on a sliding window, e.g. "how many messages of a certain >>>>> type were emitted by a certain type of sensor in the last n minutes". >>>>> Which sounds simple enough in theory: >>>>> >>>>> messageStream >>>>> .keyBy(//EmitterType + MessageType) >>>>> .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n), >>>>> Time.seconds(30))) >>>>> .map(_ => 1) >>>>> .reduce((x,y) => x + y) >>>>> .addSink(...) >>>>> >>>>> >>>>> But there is a tricky edge case: The downstream systems will never know >>>>> >>>>> when the count for a certain key goes back to 0, which is important for >>>>> our use case. The technical reason being that flink doesn't open a >>>>> >>>>> window if there are no entries, i.e. a window with count 0 doesn't exist >>>>> in flink. >>>>> >>>>> We came up with the following solution for the time being: >>>>> >>>>> messageStream >>>>> .keyBy(//EmitterType + MessageType) >>>>> .window(GlobalWindows.create()) >>>>> .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30))) >>>>> >>>>> .evictor(// CustomEvictor: Evict all messages older than n minutes >>>>> BEFORE processing the window) >>>>> .process(// CustomCounter: Count all Messages in Window State); >>>>> .addSink(...) >>>>> >>>>> >>>>> In the case of zero messages in the last n minutes, all messages will be >>>>> >>>>> evicted from the window and the process-function will get triggered one >>>>> last time on the now empty window, so we can produce a count of 0. >>>>> >>>>> I have two problems, though, with this solution: >>>>> 1) It is computationally inefficient for a simple count, as custom >>>>> >>>>> process functions will always keep all messages in state. And, on every >>>>> trigger all elements will have to be touched twice: To compare the >>>>> timestamp and to count. >>>>> 2) It does seem like a very roundabout solution to a simple problem. >>>>> >>>>> So, I was wondering if there was a more efficient or "flink-like" >>>>> approach to this. Sorry for the long writeup, but I would love to hear >>>>> your takes. >>>>> >>>>> >>>>> Best regards >>>>> Jan >>>>> >>>>> -- >>>>> neuland – Büro für Informatik GmbH >>>>> Konsul-Smidt-Str. 8g, 28217 Bremen >>>>> >>>>> Telefon (0421) 380107 57 >>>>> Fax (0421) 380107 99 >>>>> https://www.neuland-bfi.de >>>>> >>>>> https://twitter.com/neuland >>>>> https://facebook.com/neulandbfi >>>>> https://xing.com/company/neulandbfi >>>>> >>>>> >>>>> Geschäftsführer: Thomas Gebauer, Jan Zander >>>>> Registergericht: Amtsgericht Bremen, HRB 23395 HB >>>>> USt-ID. DE 246585501 >>>> >>>> -- neuland – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 >>>> BremenTelefon (0421) 380107 57Fax (0421) 380107 >>>> 99https://www.neuland-bfi.dehttps://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfiGeschäftsführer: >>>> Thomas Gebauer, Jan ZanderRegistergericht: Amtsgericht Bremen, HRB 23395 >>>> HBUSt-ID. DE 246585501 >>>> >>>> -- > neuland – Büro für Informatik GmbH > Konsul-Smidt-Str. 8g, 28217 Bremen > > Telefon (0421) 380107 57 > Fax (0421) 380107 99https://www.neuland-bfi.de > https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi > > > Geschäftsführer: Thomas Gebauer, Jan Zander > Registergericht: Amtsgericht Bremen, HRB 23395 HB > USt-ID. DE 246585501 > >