Following up on Piotr's outline, there's an example in the documentation of how to use a KeyedProcessFunction to implement an event-time tumbling window [1]. Perhaps that can help you get started.
Regards, David [1] https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi, > > I’m not sure, but I don’t think there is an existing window that would do > exactly what you want. I would suggest to go back to the > `keyedProcessFunction` (or a custom operator?), and have a > MapState<TimeStamp, StateWithTimeStamp> currentStates field. Your key would > be for example a timestamp of the beginning of your window. Value would be > the latest state in this time window, annotated with a timestamp when this > state was record. > > On each element: > > 1. you determine the window’s begin ts (key of the map) > 2. If it’s first element, register an event time timer to publish results > for that window’s end TS > 3. look into the `currentStates` if it should be modified (if your new > element is newer or first value for the given key) > > On even time timer firing > 1. output the state matching to this timer > 2. Check if there is a (more recent) value for next window, and if not: > > 3. copy the value to next window > 4. Register a timer for this window to fire > > 5. Cleanup currentState and remove value for the no longed needed key. > > I hope this helps > > Piotrek > > On 27 Apr 2020, at 12:01, Manas Kale <manaskal...@gmail.com> wrote: > > Hi, > I have an upstream operator that outputs device state transition messages > with event timestamps. Meaning it only emits output when a transition takes > place. > For example, > state1 @ 1 PM > state2 @ 2 PM > and so on. > > *Using a downstream operator, I want to emit notification messages as per > some configured periodicity.* For example, if periodicity = 20 min, in > the above scenario this operator will output : > state1 notification @ 1PM > state1 notification @ 1.20PM > state1 notification @ 1.40PM > ... > > *Now the main issue is that I want this to be driven by the watermark and > not by transition events received from upstream. *Meaning I would like to > see notification events as soon as the watermark crosses their timestamps; > *not* when the next transition event arrives at the operator (which could > be hours later, as above). > > My first solution, using a keyedProcessFunction and timers did not work as > expected because the order in which transition events arrived at this > operator was non-deterministic. To elaborate, assume a > setAutoWatermarkInterval of 10 second. > If we get transition events : > state1 @ 1sec > state2 @ 3 sec > state3 @ 5 sec > state1 @ 8 sec > the order in which these events arrived at my keyedProcessFunction was not > fixed. To solve this, these messages need to be sorted on event time, which > led me to my second solution. > > My second solution, using a EventTimeTumblingWindow with size = > setAutoWatermarkInterval, also does not work. I sorted accumulated events > in the window and applied notification-generation logic on them in order. > However, I assumed that windows are created even if there are no elements. > Since this is not the case, this solution generates notifications only when > the next state tranisition message arrives, which could be hours later. > > Does anyone have any suggestions on how I can implement this? > Thanks! > > > > >