Hey Yun, thanks for your response. This method can work, but it has one problem I still have to deal with: until the sensor is returning it's initial data, the stream won't process events and the onTimer function won't be called. I need to know if the sensor is starting in 'off' state.
I thought of creating another stream generating only off state for each sensor and merge the streams, but had an issue with multiple streams in the same environment. Do you have a nicer or build-in solution? On Fri, Oct 1, 2021 at 5:22 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Sahar, > > I think the second method should work by using the keyed process funciton > and the timers [1]. It might be implemented with some code like > > 1. On first receiving the data for a sensor id, register a processing > timer at now() + 5 seconds. > 2. On receiving the record for a sensor id, increment the value stored in > the state. > 3. When timer is triggered, check for the state for if there is record > received, and then register > a new timer at now() + 5 seconds. > > Best, > Yun > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#the-keyedprocessfunction > > > ------------------Original Mail ------------------ > *Sender:*Sahar Amgadi <sahar.amg...@3dsignals.com> > *Send Date:*Fri Oct 1 21:52:29 2021 > *Recipients:*+user <user@flink.apache.org> > *Subject:*Flink - dealing with missing events in keyBy window > >> Hey guys, >> I have a flink stream which receives sensors data every 5 seconds. >> The data is being processed using "keyBy" in a "Sliding Window" method >> using ProcessWindowFunction. >> The key is the sensorID. >> When one of the sensors is not sending any data more than 5 seconds, I >> need the stream to recognize this behaviour. >> >> 2 things I had in mind: >> 1st - maybe we could use Session Window and recognize gaps of data for >> each key, >> but I still need the Sliding Window for processing bulks of exact minutes. >> >> 2nd - Is Custom Trigger able to trigger the processing function >> and sending custom event? >> in that case, I can trigger 'off' event every 5 seconds and deal with the >> calculation in the processing function. >> >> Thanks >> >