Hi Timo and Piotrek, Thank you for the suggestions. I have been trying to set up unit tests at the operator granularity, and the blog post's testHarness examples certainly help a lot in this regard.
I understood my problem - an upstream session window operator can only report the end of the session window when the watermark has passed {lastObserverEvent + sessionTimeout}. However, my watermark was being updated periodically without taking this into account. It seems I will have to delay this notification operator's watermark by sessionTimeout. Another complication is that this sessionTimeout is per-key, so I guess I will have to implement a watermark assigner that extracts the delay period from data (similar to DynamicEventTimeWindows). Also, if I do implement such an assigner, would it be helpful to add it to Flink? I am happy to contribute if so. Any other comments/observations are also welcome! Thank you all for the help, Manas On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski <pi...@ververica.com> wrote: > Hi Manas, > > Adding to the response from Timo, if you don’t have unit tests/integration > tests, I would strongly recommend setting them up, as it makes debugging > and testing easier. You can read how to do it for your functions and > operators here [1] and here [2]. > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html > [2] > https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html > > On 28 Apr 2020, at 18:45, Timo Walther <twal...@apache.org> wrote: > > Hi Manas, > > Reg. 1: I would recommend to use a debugger in your IDE and check which > watermarks are travelling through your operators. > > Reg. 2: All event-time operations are only performed once the watermark > arrived from all parallel instances. So roughly speaking, in machine time > you can assume that the window is computed in watermark update intervals. > However, "what is computed" depends on the timestamps of your events and > how those are categorized in windows. > > I hope this helps a bit. > > Regards, > Timo > > On 28.04.20 14:38, Manas Kale wrote: > > Hi David and Piotrek, > Thank you both for your inputs. > I tried an implementation with the algorithm Piotrek suggested and David's > example. Although notifications are being generated with the watermark, > subsequent transition events are being received after the watermark has > crossed their timestamps. For example: > state1 @ 100 > notification state1@ 110 > notification state1@ 120 > notification state1@ 130 <----- shouldn't have emitted this > state2 @ 125 <----- watermark is > 125 at this stage > I think something might be subtly(?) wrong with how I have structured > upstream operators. The allowed lateness is 0 in the watermarkassigner > upstream, and I generate watermarks every x seconds. > The operator that emits state transitions is constructed using the > TumblingWindow approach I described in the first e-mail (so that I can > compute at every watermark update). Note that I can use this approach for > state-transition-operator because it only wants to emit transitions, and > nothing in between. > So, two questions: > 1. Any idea on what might be causing this incorrect watermark behaviour? > 2. If I want to perform some computation only when the watermark updates, > is using a watermark-aligned EventTimeTumblingWindow (meaning > windowDuration = watermarkUpdateInterval) the correct way to do this? > Regards, > Manas > On Tue, Apr 28, 2020 at 2:16 AM David Anderson <da...@ververica.com < > mailto:da...@ververica.com <da...@ververica.com>>> wrote: > Following up on Piotr's outline, there's an example in the > documentation of how to use a KeyedProcessFunction to implement an > event-time tumbling window [1]. Perhaps that can help you get started. > Regards, > David > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example > On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com <pi...@ververica.com>>> wrote: > Hi, > I’m not sure, but I don’t think there is an existing window that > would do exactly what you want. I would suggest to go back to > the `keyedProcessFunction` (or a custom operator?), and have a > MapState<TimeStamp, StateWithTimeStamp> currentStates field. > Your key would be for example a timestamp of the beginning of > your window. Value would be the latest state in this time > window, annotated with a timestamp when this state was record. > On each element: > 1. you determine the window’s begin ts (key of the map) > 2. If it’s first element, register an event time timer to > publish results for that window’s end TS > 3. look into the `currentStates` if it should be modified (if > your new element is newer or first value for the given key) > On even time timer firing > 1. output the state matching to this timer > 2. Check if there is a (more recent) value for next window, and > if not: > 3. copy the value to next window > 4. Register a timer for this window to fire > 5. Cleanup currentState and remove value for the no longed > needed key. > I hope this helps > Piotrek > > On 27 Apr 2020, at 12:01, Manas Kale <manaskal...@gmail.com > <mailto:manaskal...@gmail.com <manaskal...@gmail.com>>> wrote: > > Hi, > I have an upstream operator that outputs device state > transition messages with event timestamps. Meaning it only > emits output when a transition takes place. > For example, > state1 @ 1 PM > state2 @ 2 PM > and so on. > > *Using a downstream operator, I want to emit notification > messages as per some configured periodicity.* For example, if > periodicity = 20 min, in the above scenario this operator will > output : > state1 notification @ 1PM > state1 notification @ 1.20PM > state1 notification @ 1.40PM > ... > > *Now the main issue is that I want this to be driven by the > /watermark /and not by transition events received from > upstream. *Meaning I would like to see notification events as > soon as the watermark crosses their timestamps; /not/ when the > next transition event arrives at the operator (which could be > hours later, as above). > > My first solution, using a keyedProcessFunction and timers did > not work as expected because the order in which transition > events arrived at this operator was non-deterministic. To > elaborate, assume a setAutoWatermarkInterval of 10 second. > If we get transition events : > state1 @ 1sec > state2 @ 3 sec > state3 @ 5 sec > state1 @ 8 sec > the order in which these events arrived at my > keyedProcessFunction was not fixed. To solve this, these > messages need to be sorted on event time, which led me to my > second solution. > > My second solution, using a EventTimeTumblingWindow with size > = setAutoWatermarkInterval, also does not work. I sorted > accumulated events in the window and applied > notification-generation logic on them in order. However, I > assumed that windows are created even if there are no > elements. Since this is not the case, this solution generates > notifications only when the next state tranisition message > arrives, which could be hours later. > > Does anyone have any suggestions on how I can implement this? > Thanks! > > >