Hi Jan,

Thanks for sharing your solution.
You probably also want to remove previously created timer(s) in
processElement; so that you don't end up with a timer per element.
For that, you can store the previous time (in function state).

Regards,
Roman


On Fri, Feb 26, 2021 at 10:29 PM Jan Brusch <jan.bru...@neuland-bfi.de>
wrote:

> Hi everybody,
>
> I just wanted to say thanks again for all your input and share the
> (surprisingly simple) solution that we came up with in the meantime:
>
> class SensorRecordCounter extends KeyedProcessFunction<String,
> SensorRecord, SensorCount>{
>
> private ValueState<SensorCount> state;
> private long windowSizeMs = 60000L;
>
>  @Override
>   public void open(Configuration parameters) throws Exception {
>         state = getRuntimeContext().getState(new
> ValueStateDescriptor<>("sensorCount", SensorCount.class));
>   }
>
>
> @Override
> public void processElement(SensorRecord sensorRecord, Context ctx,
> Collector<SensorCount> out) throws Exception {
>         SensorCount count = state.value();
>         if (count == null) {
>             count = new SensorCount();
>             count.setSensorID(sensorRecord.getSensorID());
>             count.setCount(0);
>         }
>         count.increase();
>         state.update(count);
>         out.collect(count);
>
>         ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> windowSizeMs);
> }
>
> @Override
> public void onTimer(long timestamp, OnTimerContext ctx,
> Collector<SensorCount> out) throws Exception {
>         SensorCount count = state.value();
>         count.decrease();
>         state.update(count);
>         out.collect(count);
>
>         if (count.getCount() <= 0) {
>             state.clear();
>         }
> }
>
> }
>
>
> Best regards and a nice weekend
>
> Jan
>
>
> On 09.02.21 08:28, Arvid Heise wrote:
>
> Hi Jan,
>
> Another solution is to insert Heartbeat-events at the source for each
> sensor. The solution is very similar to how to advance watermarks when
> there are no elements in the respective source partition.
>
> However, it's only easy to implement if you have your own source and know
> all sensors on application start. It might also be possible to implement if
> you use a new Source interface.
>
> On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yungao...@aliyun.com> wrote:
>
>>
>> Hi,
>>
>> I also think there should be different ways to achieve the target. For
>> the first option listed previously,
>> the pseudo-code roughly like
>>
>> class MyFunciton extends KeyedProcessFunction {
>>     ValueState<Integer> count;
>>
>>     void open() {
>>        count = ... // Create the value state
>>    }
>>
>>     ​void processElement(T t, Context context, Collector collector) {
>>             ​Integer current = count.get();
>>             if (current == null) {
>>                       context.timeService().registerTimer(30); //
>> Register timer for the first time
>>                       current = 0;
>>             }
>>
>>             count.update(current + 1); // update the count
>>     }
>>
>>     void onTimer(...) {
>>          collector.collect(new Tuple2<>(getCurrentKey(), count.get());
>>           context.timeService().registerTimer(30);  // register the
>> following timer
>>     }
>> }
>>
>> 1. For flink the state and timer are all bound to a key implicitly, thus
>> I think they should
>> not need to be bound manually.
>> 2. To clear the outdated state, it could be cleared via count.clear(); if
>> it has been 0
>> for a long time. There are different ways to count the interval, like
>> register another timer
>> and clear the timer when received the elements or update the counter to
>> -1, -2... to mark
>> how much timer it has passed.
>>
>>
>> Best,
>>  Yun
>>
>>
>>
>>
>> ------------------Original Mail ------------------
>> *Sender:*Khachatryan Roman <khachatryan.ro...@gmail.com>
>> *Send Date:*Tue Feb 9 02:35:20 2021
>> *Recipients:*Jan Brusch <jan.bru...@neuland-bfi.de>
>> *CC:*Yun Gao <yungao...@aliyun.com>, user <user@flink.apache.org>
>> *Subject:*Re: Sliding Window Count: Tricky Edge Case / Count Zero Problem
>>
>>> Hi,
>>>
>>> Probably another solution would be to register a timer
>>> (using KeyedProcessFunction) once we see an element after keyBy. The timer
>>> will fire in windowIntervalMs. Upon firing, it will emit a dummy element
>>> which will be ignored (or subtracted) in the end.
>>> Upon receiving each new element, the function will shift the timer
>>> accordingly.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch <jan.bru...@neuland-bfi.de>
>>> wrote:
>>>
>>>> Hi Yun,
>>>>
>>>> thanks for your reply.
>>>>
>>>> I do agree with your point about standard windows being for high level
>>>> operations and the lower-level apis offering a rich toolset for most
>>>> advanced use cases.
>>>>
>>>> I have tried to solve my problem with keyedProcessFunctions also but
>>>> was not able to get it to work for two reasons:
>>>>
>>>> 1) I was not able to set up a combination of ValueState, Timers and
>>>> Triggers that emulated a sliding window with a rising and falling count
>>>> (including 0) good enough.
>>>>
>>>> 2) Memory Leak: States / Windows should be cleared after a certain time
>>>> of being at count 0 in order to prevent an infinitely rising of ValueStates
>>>> (that are not needed anymore)
>>>>
>>>>
>>>> Can you maybe please elaborate in pseudocode how you would envision
>>>> your solution?
>>>>
>>>>
>>>> Best regards
>>>>
>>>> Jan
>>>> On 08.02.21 05:31, Yun Gao wrote:
>>>>
>>>> Hi Jan,
>>>>
>>>> From my view, I think in Flink Window should be as a "high-level"
>>>> operation for some kind
>>>> of aggregation operation and if it could not satisfy the requirements,
>>>> we could at least turn to
>>>> using the "low-level" api by using KeyedProcessFunction[1].
>>>>
>>>> In this case, we could use a ValueState to store the current value for
>>>> each key, and increment
>>>> the value on each element. Then we could also register time for each
>>>> key on receiving the first
>>>> element for this key,  and in the onTimer callback, we could send the
>>>> current state value, update
>>>> the value to 0 and register another timer for this key after 30s.
>>>>
>>>> Best,
>>>>  Yun
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
>>>>
>>>> ------------------Original Mail ------------------
>>>> *Sender:*Jan Brusch <jan.bru...@neuland-bfi.de>
>>>> <jan.bru...@neuland-bfi.de>
>>>> *Send Date:*Sat Feb 6 23:44:00 2021
>>>> *Recipients:*user <user@flink.apache.org> <user@flink.apache.org>
>>>> *Subject:*Sliding Window Count: Tricky Edge Case / Count Zero Problem
>>>>
>>>>> Hi,
>>>>> I was recently working on a problem where we wanted to implement a
>>>>> simple count on a sliding window, e.g. "how many messages of a certain
>>>>> type were emitted by a certain type of sensor in the last n minutes".
>>>>> Which sounds simple enough in theory:
>>>>>
>>>>> messageStream
>>>>>      .keyBy(//EmitterType + MessageType)
>>>>>      .assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),
>>>>> Time.seconds(30)))
>>>>>      .map(_ => 1)
>>>>>      .reduce((x,y) => x + y)
>>>>>      .addSink(...)
>>>>>
>>>>>
>>>>> But there is a tricky edge case: The downstream systems will never know
>>>>>
>>>>> when the count for a certain key goes back to 0, which is important for
>>>>> our use case. The technical reason being that flink doesn't open a
>>>>>
>>>>> window if there are no entries, i.e. a window with count 0 doesn't exist
>>>>> in flink.
>>>>>
>>>>> We came up with the following solution for the time being:
>>>>>
>>>>> messageStream
>>>>>      .keyBy(//EmitterType + MessageType)
>>>>>      .window(GlobalWindows.create())
>>>>>      .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
>>>>>
>>>>>      .evictor(// CustomEvictor: Evict all messages older than n minutes
>>>>> BEFORE processing the window)
>>>>>      .process(// CustomCounter: Count all Messages in Window State);
>>>>>      .addSink(...)
>>>>>
>>>>>
>>>>> In the case of zero messages in the last n minutes, all messages will be
>>>>>
>>>>> evicted from the window and the process-function will get triggered one
>>>>> last time on the now empty window, so we can produce a count of 0.
>>>>>
>>>>> I have two problems, though, with this solution:
>>>>> 1) It is computationally inefficient for a simple count, as custom
>>>>>
>>>>> process functions will always keep all messages in state. And, on every
>>>>> trigger all elements will have to be touched twice: To compare the
>>>>> timestamp and to count.
>>>>> 2) It does seem like a very roundabout solution to a simple problem.
>>>>>
>>>>> So, I was wondering if there was a more efficient or "flink-like"
>>>>> approach to this. Sorry for the long writeup, but I would love to hear
>>>>> your takes.
>>>>>
>>>>>
>>>>> Best regards
>>>>> Jan
>>>>>
>>>>> --
>>>>> neuland  – Büro für Informatik GmbH
>>>>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>>>>
>>>>> Telefon (0421) 380107 57
>>>>> Fax (0421) 380107 99
>>>>> https://www.neuland-bfi.de
>>>>>
>>>>> https://twitter.com/neuland
>>>>> https://facebook.com/neulandbfi
>>>>> https://xing.com/company/neulandbfi
>>>>>
>>>>>
>>>>> Geschäftsführer: Thomas Gebauer, Jan Zander
>>>>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>>>>> USt-ID. DE 246585501
>>>>
>>>> -- neuland  – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 
>>>> BremenTelefon (0421) 380107 57Fax (0421) 380107 
>>>> 99https://www.neuland-bfi.dehttps://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfiGeschäftsführer:
>>>>  Thomas Gebauer, Jan ZanderRegistergericht: Amtsgericht Bremen, HRB 23395 
>>>> HBUSt-ID. DE 246585501
>>>>
>>>> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99https://www.neuland-bfi.de
> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>

Reply via email to