Hi everybody,

I just wanted to say thanks again for all your input and share the (surprisingly simple) solution that we came up with in the meantime:

class SensorRecordCounter extends KeyedProcessFunction<String, SensorRecord, SensorCount>{

private ValueState<SensorCount> state;
private long windowSizeMs = 60000L;

  public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("sensorCount", SensorCount.class));

public void processElement(SensorRecord sensorRecord, Context ctx, Collector<SensorCount> out) throws Exception {
        SensorCount count = state.value();
        if (count == null) {
            count = new SensorCount();

        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + windowSizeMs);

public void onTimer(long timestamp, OnTimerContext ctx, Collector<SensorCount> out) throws Exception {
        SensorCount count = state.value();

        if (count.getCount() <= 0) {


Best regards and a nice weekend


On 09.02.21 08:28, Arvid Heise wrote:
Hi Jan,

Another solution is to insert Heartbeat-events at the source for each sensor. The solution is very similar to how to advance watermarks when there are no elements in the respective source partition.

However, it's only easy to implement if you have your own source and know all sensors on application start. It might also be possible to implement if you use a new Source interface.

On Tue, Feb 9, 2021 at 7:20 AM Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>> wrote:


    I also think there should be different ways to achieve the target.
    For the first option listed previously,
    the pseudo-code roughly like

    class MyFunciton extends KeyedProcessFunction {
        ValueState<Integer> count;

        void open() {
           count = ... // Create the value state

        ​void processElement(T t, Context context, Collector collector) {
                ​Integer current = count.get();
                if (current == null) {
    context.timeService().registerTimer(30); // Register timer for the
    first time
                          current = 0;

                count.update(current + 1); // update the count

        void onTimer(...) {
             collector.collect(new Tuple2<>(getCurrentKey(), count.get());
    context.timeService().registerTimer(30);  // register the
    following timer

    1. For flink the state and timer are all bound to a key
    implicitly, thus I think they should
    not need to be bound manually.
    2. To clear the outdated state, it could be cleared via
    count.clear(); if it has been 0
    for a long time. There are different ways to count the interval,
    like register another timer
    and clear the timer when received the elements or update the
    counter to -1, -2... to mark
    how much timer it has passed.


        ------------------Original Mail ------------------
        *Sender:*Khachatryan Roman <khachatryan.ro...@gmail.com
        *Send Date:*Tue Feb 9 02:35:20 2021
        *Recipients:*Jan Brusch <jan.bru...@neuland-bfi.de
        *CC:*Yun Gao <yungao...@aliyun.com
        <mailto:yungao...@aliyun.com>>, user <user@flink.apache.org
        *Subject:*Re: Sliding Window Count: Tricky Edge Case / Count
        Zero Problem


            Probably another solution would be to register a timer
            (using KeyedProcessFunction) once we see an element after
            keyBy. The timer will fire in windowIntervalMs. Upon
            firing, it will emit a dummy element which will be ignored
            (or subtracted) in the end.
            Upon receiving each new element, the function will shift
            the timer accordingly.


            On Mon, Feb 8, 2021 at 10:50 AM Jan Brusch
            <mailto:jan.bru...@neuland-bfi.de>> wrote:

                Hi Yun,

                thanks for your reply.

                I do agree with your point about standard windows
                being for high level operations and the lower-level
                apis offering a rich toolset for most advanced use cases.

                I have tried to solve my problem with
                keyedProcessFunctions also but was not able to get it
                to work for two reasons:

                1) I was not able to set up a combination of
                ValueState, Timers and Triggers that emulated a
                sliding window with a rising and falling count
                (including 0) good enough.

                2) Memory Leak: States / Windows should be cleared
                after a certain time of being at count 0 in order to
                prevent an infinitely rising of ValueStates (that are
                not needed anymore)

                Can you maybe please elaborate in pseudocode how you
                would envision your solution?

                Best regards


                On 08.02.21 05:31, Yun Gao wrote:

                    Hi Jan,

                    From my view, I think in Flink Window should be as
                    a "high-level" operation for some kind
                    of aggregation operation and if it could not
                    satisfy the requirements, we could at least turn to
                    using the "low-level" api by using

                    In this case, we could use a ValueState to store
                    the current value for each key, and increment
                    the value on each element. Then we could also
                    register time for each key on receiving the first
                    element for this key,  and in the onTimer
                    callback, we could send the current state value,
                    the value to 0 and register another timer for this
                    key after 30s.



                        ------------------Original Mail ------------------
                        *Sender:*Jan Brusch
                        *Send Date:*Sat Feb 6 23:44:00 2021
                        *Recipients:*user <user@flink.apache.org>
                        *Subject:*Sliding Window Count: Tricky Edge
                        Case / Count Zero Problem

                            I was recently working on a problem where we wanted 
to implement a

                            simple count on a sliding window, e.g. "how many 
messages of a certain

                            type were emitted by a certain type of sensor in the 
last n minutes".

                            Which sounds simple enough in theory:

                                 .keyBy(//EmitterType + MessageType)

                                 .map(_ => 1)
                                 .reduce((x,y) => x + y)

                            But there is a tricky edge case: The downstream 
systems will never know

                            when the count for a certain key goes back to 0, 
which is important for

                            our use case. The technical reason being that flink 
doesn't open a

                            window if there are no entries, i.e. a window with 
count 0 doesn't exist

                            in flink.

                            We came up with the following solution for the time 

                                 .keyBy(//EmitterType + MessageType)
                                 .evictor(// CustomEvictor: Evict all messages 
older than n minutes

                            BEFORE processing the window)
                                 .process(// CustomCounter: Count all Messages 
in Window State);

                            In the case of zero messages in the last n minutes, 
all messages will be

                            evicted from the window and the process-function 
will get triggered one

                            last time on the now empty window, so we can 
produce a count of 0.

                            I have two problems, though, with this solution:
                            1) It is computationally inefficient for a simple 
count, as custom

                            process functions will always keep all messages in 
state. And, on every

                            trigger all elements will have to be touched twice: 
To compare the

                            timestamp and to count.
                            2) It does seem like a very roundabout solution to 
a simple problem.

                            So, I was wondering if there was a more efficient or 

                            approach to this. Sorry for the long writeup, but I 
would love to hear

                            your takes.

                            Best regards

-- neuland  – Büro für Informatik GmbH
                            Konsul-Smidt-Str. 8g, 28217 Bremen

                            Telefon (0421) 380107 57
                            Fax (0421) 380107 99


                            Geschäftsführer: Thomas Gebauer, Jan Zander
                            Registergericht: Amtsgericht Bremen, HRB 23395 HB
                            USt-ID. DE 246585501

                -- neuland  – Büro für Informatik GmbHKonsul-Smidt-Str. 8g, 28217 BremenTelefon (0421) 
380107 57Fax (0421) 380107 99https://www.neuland-bfi.de  
<https://xing.com/company/neulandbfi>Geschäftsführer: Thomas Gebauer, Jan ZanderRegistergericht: 
Amtsgericht Bremen, HRB 23395 HBUSt-ID. DE 246585501

neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501

Reply via email to