Hi Manas, Adding to the response from Timo, if you don’t have unit tests/integration tests, I would strongly recommend setting them up, as it makes debugging and testing easier. You can read how to do it for your functions and operators here [1] and here [2].
Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html> [2] https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html <https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html> > On 28 Apr 2020, at 18:45, Timo Walther <twal...@apache.org> wrote: > > Hi Manas, > > Reg. 1: I would recommend to use a debugger in your IDE and check which > watermarks are travelling through your operators. > > Reg. 2: All event-time operations are only performed once the watermark > arrived from all parallel instances. So roughly speaking, in machine time you > can assume that the window is computed in watermark update intervals. > However, "what is computed" depends on the timestamps of your events and how > those are categorized in windows. > > I hope this helps a bit. > > Regards, > Timo > > On 28.04.20 14:38, Manas Kale wrote: >> Hi David and Piotrek, >> Thank you both for your inputs. >> I tried an implementation with the algorithm Piotrek suggested and David's >> example. Although notifications are being generated with the watermark, >> subsequent transition events are being received after the watermark has >> crossed their timestamps. For example: >> state1 @ 100 >> notification state1@ 110 >> notification state1@ 120 >> notification state1@ 130 <----- shouldn't have emitted this >> state2 @ 125 <----- watermark is > 125 at this stage >> I think something might be subtly(?) wrong with how I have structured >> upstream operators. The allowed lateness is 0 in the watermarkassigner >> upstream, and I generate watermarks every x seconds. >> The operator that emits state transitions is constructed using the >> TumblingWindow approach I described in the first e-mail (so that I can >> compute at every watermark update). Note that I can use this approach for >> state-transition-operator because it only wants to emit transitions, and >> nothing in between. >> So, two questions: >> 1. Any idea on what might be causing this incorrect watermark behaviour? >> 2. If I want to perform some computation only when the watermark updates, is >> using a watermark-aligned EventTimeTumblingWindow (meaning windowDuration = >> watermarkUpdateInterval) the correct way to do this? >> Regards, >> Manas >> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <da...@ververica.com >> <mailto:da...@ververica.com> <mailto:da...@ververica.com >> <mailto:da...@ververica.com>>> wrote: >> Following up on Piotr's outline, there's an example in the >> documentation of how to use a KeyedProcessFunction to implement an >> event-time tumbling window [1]. Perhaps that can help you get started. >> Regards, >> David >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example >> >> <https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example> >> On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com> >> <mailto:pi...@ververica.com <mailto:pi...@ververica.com>>> wrote: >> Hi, >> I’m not sure, but I don’t think there is an existing window that >> would do exactly what you want. I would suggest to go back to >> the `keyedProcessFunction` (or a custom operator?), and have a >> MapState<TimeStamp, StateWithTimeStamp> currentStates field. >> Your key would be for example a timestamp of the beginning of >> your window. Value would be the latest state in this time >> window, annotated with a timestamp when this state was record. >> On each element: >> 1. you determine the window’s begin ts (key of the map) >> 2. If it’s first element, register an event time timer to >> publish results for that window’s end TS >> 3. look into the `currentStates` if it should be modified (if >> your new element is newer or first value for the given key) >> On even time timer firing >> 1. output the state matching to this timer >> 2. Check if there is a (more recent) value for next window, and >> if not: >> 3. copy the value to next window >> 4. Register a timer for this window to fire >> 5. Cleanup currentState and remove value for the no longed >> needed key. >> I hope this helps >> Piotrek >>> On 27 Apr 2020, at 12:01, Manas Kale <manaskal...@gmail.com >>> <mailto:manaskal...@gmail.com> >>> <mailto:manaskal...@gmail.com <mailto:manaskal...@gmail.com>>> wrote: >>> >>> Hi, >>> I have an upstream operator that outputs device state >>> transition messages with event timestamps. Meaning it only >>> emits output when a transition takes place. >>> For example, >>> state1 @ 1 PM >>> state2 @ 2 PM >>> and so on. >>> >>> *Using a downstream operator, I want to emit notification >>> messages as per some configured periodicity.* For example, if >>> periodicity = 20 min, in the above scenario this operator will >>> output : >>> state1 notification @ 1PM >>> state1 notification @ 1.20PM >>> state1 notification @ 1.40PM >>> ... >>> >>> *Now the main issue is that I want this to be driven by the >>> /watermark /and not by transition events received from >>> upstream. *Meaning I would like to see notification events as >>> soon as the watermark crosses their timestamps; /not/ when the >>> next transition event arrives at the operator (which could be >>> hours later, as above). >>> >>> My first solution, using a keyedProcessFunction and timers did >>> not work as expected because the order in which transition >>> events arrived at this operator was non-deterministic. To >>> elaborate, assume a setAutoWatermarkInterval of 10 second. >>> If we get transition events : >>> state1 @ 1sec >>> state2 @ 3 sec >>> state3 @ 5 sec >>> state1 @ 8 sec >>> the order in which these events arrived at my >>> keyedProcessFunction was not fixed. To solve this, these >>> messages need to be sorted on event time, which led me to my >>> second solution. >>> >>> My second solution, using a EventTimeTumblingWindow with size >>> = setAutoWatermarkInterval, also does not work. I sorted >>> accumulated events in the window and applied >>> notification-generation logic on them in order. However, I >>> assumed that windows are created even if there are no >>> elements. Since this is not the case, this solution generates >>> notifications only when the next state tranisition message >>> arrives, which could be hours later. >>> >>> Does anyone have any suggestions on how I can implement this? >>> Thanks!