Hi Roman,

thanks for your reply.

Don't timers remove themselves after firing?

Apart from that, the idea is indeed to have one timer per element, so that we count one up whenever the element comes in and count one down exactly <windowsize> later. So we emulate a sliding window without the "hops" in certain intervals. Instead, we always have a real-time running count of elements in the last <windowsize>. But yes, the price for that is to have one timer per element. Which is manageable for our use case (large windowsize, a LOT of sensors but relatively few elements per sensor). In fact, for our use case this solution is much more efficient than a sliding window.


Best regards

Jan


On 02.03.21 20:40, Roman Khachatryan wrote:
Hi Jan,

Thanks for sharing your solution.
You probably also want to remove previously created timer(s) in processElement; so that you don't end up with a timer per element.
For that, you can store the previous time (in function state).

Regards,
Roman


On Fri, Feb 26, 2021 at 10:29 PM Jan Brusch <jan.bru...@neuland-bfi.de <mailto:jan.bru...@neuland-bfi.de>> wrote:

    Hi everybody,

    I just wanted to say thanks again for all your input and share the
    (surprisingly simple) solution that we came up with in the meantime:

    class SensorRecordCounter extends KeyedProcessFunction<String,
    SensorRecord, SensorCount>{

    private ValueState<SensorCount> state;
    private long windowSizeMs = 60000L;

     @Override
      public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new
    ValueStateDescriptor<>("sensorCount", SensorCount.class));
      }


    @Override
    public void processElement(SensorRecord sensorRecord, Context ctx,
    Collector<SensorCount> out) throws Exception {
            SensorCount count = state.value();
            if (count == null) {
                count = new SensorCount();
                count.setSensorID(sensorRecord.getSensorID());
                count.setCount(0);
            }
            count.increase();
            state.update(count);
            out.collect(count);

    ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
    windowSizeMs);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
    Collector<SensorCount> out) throws Exception {
            SensorCount count = state.value();
            count.decrease();
            state.update(count);
            out.collect(count);

            if (count.getCount() <= 0) {
                state.clear();
            }
    }

    }


    Best regards and a nice weekend

    Jan


    On 09.02.21 08:28, Arvid Heise wrote:
    Hi Jan,

    Another solution is to insert Heartbeat-events at the source for
    each sensor. The solution is very similar to how to advance
    watermarks when there are no elements in the respective source
    partition.

    However, it's only easy to implement if you have your own source
    and know all sensors on application start. It might also be
    possible to implement if you use a new Source interface.

    On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yungao...@aliyun.com
    <mailto:yungao...@aliyun.com>> wrote:


        Hi,

        I also think there should be different ways to achieve the
        target. For the first option listed previously,
        the pseudo-code roughly like

        class MyFunciton extends KeyedProcessFunction {
            ValueState<Integer> count;

            void open() {
               count = ... // Create the value state
           }

            ​void processElement(T t, Context context, Collector
        collector) {
                    ​Integer current = count.get();
                    if (current == null) {
        context.timeService().registerTimer(30); // Register timer
        for the first time
                              current = 0;
                    }

                    count.update(current + 1); // update the count
            }

            void onTimer(...) {
                 collector.collect(new Tuple2<>(getCurrentKey(),
        count.get());
        context.timeService().registerTimer(30);  // register the
        following timer
            }
        }

        1. For flink the state and timer are all bound to a key
        implicitly, thus I think they should
        not need to be bound manually.
        2. To clear the outdated state, it could be cleared via
        count.clear(); if it has been 0
        for a long time. There are different ways to count the
        interval, like register another timer
        and clear the timer when received the elements or update the
        counter to -1, -2... to mark
        how much timer it has passed.


        Best,
         Yun




            ------------------Original Mail ------------------
            *Sender:*Khachatryan Roman <khachatryan.ro...@gmail.com
            <mailto:khachatryan.ro...@gmail.com>>
            *Send Date:*Tue Feb 9 02:35:20 2021
            *Recipients:*Jan Brusch <jan.bru...@neuland-bfi.de
            <mailto:jan.bru...@neuland-bfi.de>>
            *CC:*Yun Gao <yungao...@aliyun.com
            <mailto:yungao...@aliyun.com>>, user
            <user@flink.apache.org <mailto:user@flink.apache.org>>
            *Subject:*Re: Sliding Window Count: Tricky Edge Case /
            Count Zero Problem

                Hi,

                Probably another solution would be to register a
                timer (using KeyedProcessFunction) once we see an
                element after keyBy. The timer will fire in
                windowIntervalMs. Upon firing, it will emit a dummy
                element which will be ignored (or subtracted) in the end.
                Upon receiving each new element, the function will
                shift the timer accordingly.

                Regards,
                Roman


                On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch
                <jan.bru...@neuland-bfi.de
                <mailto:jan.bru...@neuland-bfi.de>> wrote:

                    Hi Yun,

                    thanks for your reply.

                    I do agree with your point about standard windows
                    being for high level operations and the
                    lower-level apis offering a rich toolset for most
                    advanced use cases.

                    I have tried to solve my problem with
                    keyedProcessFunctions also but was not able to
                    get it to work for two reasons:

                    1) I was not able to set up a combination of
                    ValueState, Timers and Triggers that emulated a
                    sliding window with a rising and falling count
                    (including 0) good enough.

                    2) Memory Leak: States / Windows should be
                    cleared after a certain time of being at count 0
                    in order to prevent an infinitely rising of
                    ValueStates (that are not needed anymore)


                    Can you maybe please elaborate in pseudocode how
                    you would envision your solution?


                    Best regards

                    Jan

                    On 08.02.21 05:31, Yun Gao wrote:

                        Hi Jan,

                        From my view, I think in Flink Window should
                        be as a "high-level" operation for some kind
                        of aggregation operation and if it could not
                        satisfy the requirements, we could at least
                        turn to
                        using the "low-level" api by using
                        KeyedProcessFunction[1].

                        In this case, we could use a ValueState to
                        store the current value for each key, and
                        increment
                        the value on each element. Then we could also
                        register time for each key on receiving the
                        first
                        element for this key,  and in the onTimer
                        callback, we could send the current state
                        value, update
                        the value to 0 and register another timer for
                        this key after 30s.

                        Best,
                         Yun



                        [1]
                        
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction
                        
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-keyedprocessfunction>

                            ------------------Original Mail
                            ------------------
                            *Sender:*Jan Brusch
                            <jan.bru...@neuland-bfi.de>
                            <mailto:jan.bru...@neuland-bfi.de>
                            *Send Date:*Sat Feb 6 23:44:00 2021
                            *Recipients:*user <user@flink.apache.org>
                            <mailto:user@flink.apache.org>
                            *Subject:*Sliding Window Count: Tricky
                            Edge Case / Count Zero Problem

                                Hi,
                                I was recently working on a problem where we 
wanted to implement a

                                simple count on a sliding window, e.g. "how 
many messages of a certain

                                type were emitted by a certain type of sensor in the 
last n minutes".

                                Which sounds simple enough in theory:

                                messageStream
                                     .keyBy(//EmitterType + MessageType)
                                     
.assignWindow(SlidingProcessingTimeWindows.of(Time.minutes(n),

                                Time.seconds(30)))
                                     .map(_ => 1)
                                     .reduce((x,y) => x + y)
                                     .addSink(...)

                                But there is a tricky edge case: The downstream 
systems will never know

                                when the count for a certain key goes back to 
0, which is important for

                                our use case. The technical reason being that 
flink doesn't open a

                                window if there are no entries, i.e. a window 
with count 0 doesn't exist

                                in flink.

                                We came up with the following solution for the 
time being:

                                messageStream
                                     .keyBy(//EmitterType + MessageType)
                                     .window(GlobalWindows.create())
                                     
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
                                     .evictor(// CustomEvictor: Evict all 
messages older than n minutes

                                BEFORE processing the window)
                                     .process(// CustomCounter: Count all 
Messages in Window State);
                                     .addSink(...)

                                In the case of zero messages in the last n 
minutes, all messages will be

                                evicted from the window and the 
process-function will get triggered one

                                last time on the now empty window, so we can 
produce a count of 0.

                                I have two problems, though, with this solution:
                                1) It is computationally inefficient for a 
simple count, as custom

                                process functions will always keep all messages 
in state. And, on every

                                trigger all elements will have to be touched 
twice: To compare the

                                timestamp and to count.
                                2) It does seem like a very roundabout solution 
to a simple problem.

                                So, I was wondering if there was a more efficient or 
"flink-like"

                                approach to this. Sorry for the long writeup, 
but I would love to hear

                                your takes.


                                Best regards
                                Jan

-- neuland  – Büro für Informatik GmbH
                                Konsul-Smidt-Str. 8g, 28217 Bremen

                                Telefon (0421) 380107 57
                                Fax (0421) 380107 99
                                https://www.neuland-bfi.de
                                <https://www.neuland-bfi.de>

                                https://twitter.com/neuland
                                <https://twitter.com/neuland>
                                https://facebook.com/neulandbfi
                                <https://facebook.com/neulandbfi>
                                https://xing.com/company/neulandbfi
                                <https://xing.com/company/neulandbfi>


                                Geschäftsführer: Thomas Gebauer, Jan Zander
                                Registergericht: Amtsgericht Bremen, HRB 23395 
HB
                                USt-ID. DE 246585501

                    -- neuland  – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon 
(0421) 380107 57Fax (0421) 380107 99https://www.neuland-bfi.de  
<https://www.neuland-bfi.de>https://twitter.com/neuland  
<https://twitter.com/neuland>https://facebook.com/neulandbfi  
<https://facebook.com/neulandbfi>https://xing.com/company/neulandbfi  
<https://xing.com/company/neulandbfi>Geschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht: 
Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501

-- neuland – Büro für Informatik GmbH
    Konsul-Smidt-Str. 8g, 28217 Bremen

    Telefon (0421) 380107 57
    Fax (0421) 380107 99
    https://www.neuland-bfi.de  <https://www.neuland-bfi.de>

    https://twitter.com/neuland  <https://twitter.com/neuland>
    https://facebook.com/neulandbfi  <https://facebook.com/neulandbfi>
    https://xing.com/company/neulandbfi  <https://xing.com/company/neulandbfi>


    Geschäftsführer: Thomas Gebauer, Jan Zander
    Registergericht: Amtsgericht Bremen, HRB 23395 HB
    USt-ID. DE 246585501

--
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply via email to