Following up on Piotr's outline, there's an example in the documentation of
how to use a KeyedProcessFunction to implement an event-time tumbling
window [1]. Perhaps that can help you get started.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example


On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> I’m not sure, but I don’t think there is an existing window that would do
> exactly what you want. I would suggest to go back to the
> `keyedProcessFunction` (or a custom operator?), and have a
> MapState<TimeStamp, StateWithTimeStamp> currentStates field. Your key would
> be for example a timestamp of the beginning of your window. Value would be
> the latest state in this time window, annotated with a timestamp when this
> state was record.
>
> On each element:
>
> 1. you determine the window’s begin ts (key of the map)
> 2. If it’s first element, register an event time timer to publish results
> for that window’s end TS
> 3. look into the `currentStates` if it should be modified (if your new
> element is newer or first value for the given key)
>
> On even time timer firing
> 1. output the state matching to this timer
> 2. Check if there is a (more recent) value for next window, and if not:
>
> 3. copy the value to next window
> 4. Register a timer for this window to fire
>
> 5. Cleanup currentState and remove value for the no longed needed key.
>
> I hope this helps
>
> Piotrek
>
> On 27 Apr 2020, at 12:01, Manas Kale <manaskal...@gmail.com> wrote:
>
> Hi,
> I have an upstream operator that outputs device state transition messages
> with event timestamps. Meaning it only emits output when a transition takes
> place.
> For example,
> state1 @ 1 PM
> state2 @ 2 PM
> and so on.
>
> *Using a downstream operator, I want to emit notification messages as per
> some configured periodicity.* For example, if periodicity = 20 min, in
> the above scenario this operator will output :
> state1 notification @ 1PM
> state1 notification @ 1.20PM
> state1 notification @ 1.40PM
>  ...
>
> *Now the main issue is that I want this to be driven by the watermark and
> not by transition events received from upstream. *Meaning I would like to
> see notification events as soon as the watermark crosses their timestamps;
> *not* when the next transition event arrives at the operator (which could
> be hours later, as above).
>
> My first solution, using a keyedProcessFunction and timers did not work as
> expected because the order in which transition events arrived at this
> operator was non-deterministic. To elaborate, assume a
> setAutoWatermarkInterval of 10 second.
> If we get transition events :
> state1 @ 1sec
> state2 @ 3 sec
> state3 @ 5 sec
> state1 @ 8 sec
> the order in which these events arrived at my keyedProcessFunction was not
> fixed. To solve this, these messages need to be sorted on event time, which
> led me to my second solution.
>
> My second solution, using a EventTimeTumblingWindow with size =
> setAutoWatermarkInterval, also does not work. I sorted accumulated events
> in the window and applied notification-generation logic on them in order.
> However, I assumed that windows are created even if there are no elements.
> Since this is not the case, this solution generates notifications only when
> the next state tranisition message arrives, which could be hours later.
>
> Does anyone have any suggestions on how I can implement this?
> Thanks!
>
>
>
>
>

Reply via email to