Hi Timo and Piotrek,
Thank you for the suggestions.
I have been trying to set up unit tests at the operator granularity, and
the blog post's testHarness examples certainly help a lot in this regard.

I understood my problem - an upstream session window operator can only
report the end of the session window when the watermark has passed
{lastObserverEvent + sessionTimeout}. However, my watermark was being
updated periodically without taking this into account. It seems I will have
to delay this notification operator's watermark by sessionTimeout.
Another complication is that this sessionTimeout is per-key, so I guess I
will have to implement a watermark assigner that extracts the delay period
from data (similar to DynamicEventTimeWindows).

Also, if I do implement such an assigner, would it be helpful to add it to
Flink? I am happy to contribute if so. Any other comments/observations are
also welcome!

Thank you all for the help,
Manas


On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi Manas,
>
> Adding to the response from Timo, if you don’t have unit tests/integration
> tests, I would strongly recommend setting them up, as it makes debugging
> and testing easier. You can read how to do it for your functions and
> operators here [1] and here [2].
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> [2]
> https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
>
> On 28 Apr 2020, at 18:45, Timo Walther <twal...@apache.org> wrote:
>
> Hi Manas,
>
> Reg. 1: I would recommend to use a debugger in your IDE and check which
> watermarks are travelling through your operators.
>
> Reg. 2: All event-time operations are only performed once the watermark
> arrived from all parallel instances. So roughly speaking, in machine time
> you can assume that the window is computed in watermark update intervals.
> However, "what is computed" depends on the timestamps of your events and
> how those are categorized in windows.
>
> I hope this helps a bit.
>
> Regards,
> Timo
>
> On 28.04.20 14:38, Manas Kale wrote:
>
> Hi David and Piotrek,
> Thank you both for your inputs.
> I tried an implementation with the algorithm Piotrek suggested and David's
> example. Although notifications are being generated with the watermark,
> subsequent transition events are being received after the watermark has
> crossed their timestamps. For example:
> state1 @ 100
> notification state1@ 110
> notification state1@ 120
> notification state1@ 130    <----- shouldn't have emitted this
> state2 @ 125                     <----- watermark is > 125 at this stage
> I think something might be subtly(?) wrong with how I have structured
> upstream operators. The allowed lateness is 0 in the watermarkassigner
> upstream, and I generate watermarks every x seconds.
> The operator that emits state transitions is constructed using the
> TumblingWindow approach I described in the first e-mail (so that I can
> compute at every watermark update). Note that I can use this approach for
> state-transition-operator because it only wants to emit transitions, and
> nothing in between.
> So, two questions:
> 1. Any idea on what might be causing this incorrect watermark behaviour?
> 2. If I want to perform some computation only when the watermark updates,
> is using a watermark-aligned EventTimeTumblingWindow (meaning
> windowDuration = watermarkUpdateInterval) the correct way to do this?
> Regards,
> Manas
> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <da...@ververica.com <
> mailto:da...@ververica.com <da...@ververica.com>>> wrote:
>    Following up on Piotr's outline, there's an example in the
>    documentation of how to use a KeyedProcessFunction to implement an
>    event-time tumbling window [1]. Perhaps that can help you get started.
>    Regards,
>    David
>    [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
>    On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <pi...@ververica.com
>    <mailto:pi...@ververica.com <pi...@ververica.com>>> wrote:
>        Hi,
>        I’m not sure, but I don’t think there is an existing window that
>        would do exactly what you want. I would suggest to go back to
>        the `keyedProcessFunction` (or a custom operator?), and have a
>        MapState<TimeStamp, StateWithTimeStamp> currentStates field.
>        Your key would be for example a timestamp of the beginning of
>        your window. Value would be the latest state in this time
>        window, annotated with a timestamp when this state was record.
>        On each element:
>        1. you determine the window’s begin ts (key of the map)
>        2. If it’s first element, register an event time timer to
>        publish results for that window’s end TS
>        3. look into the `currentStates` if it should be modified (if
>        your new element is newer or first value for the given key)
>        On even time timer firing
>        1. output the state matching to this timer
>        2. Check if there is a (more recent) value for next window, and
>        if not:
>        3. copy the value to next window
>        4. Register a timer for this window to fire
>        5. Cleanup currentState and remove value for the no longed
>        needed key.
>        I hope this helps
>        Piotrek
>
>        On 27 Apr 2020, at 12:01, Manas Kale <manaskal...@gmail.com
>        <mailto:manaskal...@gmail.com <manaskal...@gmail.com>>> wrote:
>
>        Hi,
>        I have an upstream operator that outputs device state
>        transition messages with event timestamps. Meaning it only
>        emits output when a transition takes place.
>        For example,
>        state1 @ 1 PM
>        state2 @ 2 PM
>        and so on.
>
>        *Using a downstream operator, I want to emit notification
>        messages as per some configured periodicity.* For example, if
>        periodicity = 20 min, in the above scenario this operator will
>        output :
>        state1 notification @ 1PM
>        state1 notification @ 1.20PM
>        state1 notification @ 1.40PM
>         ...
>
>        *Now the main issue is that I want this to be driven by the
>        /watermark /and not by transition events received from
>        upstream. *Meaning I would like to see notification events as
>        soon as the watermark crosses their timestamps; /not/ when the
>        next transition event arrives at the operator (which could be
>        hours later, as above).
>
>        My first solution, using a keyedProcessFunction and timers did
>        not work as expected because the order in which transition
>        events arrived at this operator was non-deterministic. To
>        elaborate, assume a setAutoWatermarkInterval of 10 second.
>        If we get transition events :
>        state1 @ 1sec
>        state2 @ 3 sec
>        state3 @ 5 sec
>        state1 @ 8 sec
>        the order in which these events arrived at my
>        keyedProcessFunction was not fixed. To solve this, these
>        messages need to be sorted on event time, which led me to my
>        second solution.
>
>        My second solution, using a EventTimeTumblingWindow with size
>        = setAutoWatermarkInterval, also does not work. I sorted
>        accumulated events in the window and applied
>        notification-generation logic on them in order. However, I
>        assumed that windows are created even if there are no
>        elements. Since this is not the case, this solution generates
>        notifications only when the next state tranisition message
>        arrives, which could be hours later.
>
>        Does anyone have any suggestions on how I can implement this?
>        Thanks!
>
>
>

Reply via email to