thanks for the detail explanation! I removed my implementation of the watermark which is not necessary in my case. I will only use Watermarkers if I am dealing with out of order events.
*--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Wed, Aug 21, 2019 at 9:09 PM David Anderson <da...@ververica.com> wrote: > What Watermarks do is to advance the event time clock. You can > consider a Watermark(t) as an assertion about the completeness of the > stream -- it marks a point in the stream and says that at that point, > the stream is (probably) now complete up to time t. > > The autoWatermarkInterval determines how often new Watermarks are > created -- or in other words, how often the event-time clock will be > able to move forward. From what you've presented, it seems like you > can leave this at its default, which is 200 msec. This means that five > times a second, as your application runs, each parallel instance will > create a new watermark (assuming there's been new data and that the > event time clock can be advanced). > > getCurrentWatermark() should NOT be implemented in terms of > System.currentTimeMillis -- you do not want your watermarking to > depend on the current processing time if you can possibly avoid it. > Part of the beauty of event time processing is being able to run your > application on historic data as well as live, real-time data, and this > is only possible if your watermarks depend on timestamps recorded in > the events, rather than System.currentTimeMillis. > > You should also try to decouple your watermarking strategy from the > specific processing you intend to later, downstream. The primary > concern you need to have when implementing the watermarking is to > consider how much out-of-orderness your data may have. A typical > timestamp assigner and watermark generator will look something like > this, assuming that your event stream will have its timestamps at most > 10 seconds out of order, and that your events have a timestamp field: > > DataStream<MyEvent> withTimestampsAndWatermarks = > stream.assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { > > @Override > public long extractTimestamp(MyEvent element) { > return element.timestamp; > } > }); > > As for your specific application requirements, you might find it > simpler to rely on State Time-to-Live [1] rather than clearing state > yourself. > > There's no need to retain the state until the windowed join is > completed, since the operator executing the join can't access the > state in the CoProcessFunction. The CoProcessFunction should clear the > state whenever it is done with it; no other part of your job will > access it. > > If there is a risk that the CoProcessFunction will create state that > isn't freed, and you don't for some reason find State TTL a good > solution for this, then you can use either a processing time or event > time timer to trigger a call to onTimer in which you can free the > state. For example, > > timerService.registerEventTimeTimer(event.getEventTime() + 60 * 1000); > > registers an event time timer for 60 seconds after the timestamp in an > event -- meaning, take the event's timestamp, add 60 seconds, and wait > until the current Watermark has surpassed that point in time. > > The Flink training website has tutorials [2] and exercises [3] on these > topics. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl > [2] https://training.ververica.com/lessons/event-time-watermarks.html > [3] > https://training.ververica.com/exercises/rideEnrichment-processfunction.html > > > On Wed, Aug 21, 2019 at 10:59 AM Felipe Gutierrez > <felipe.o.gutier...@gmail.com> wrote: > > > > Hi, > > > > I am a little confused about watermarkers in Flink. > > > > My application is using EventTime. My sources are calling > ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a > CoProcessFunction which merge the two streams. I have a state on this > function and I want to clean this state every time that I trigger the > window of my next operator. The next operator is a join which is using a > window of 1 minute [1]. > > > > stream01 = source01.connect(sideoutput02).keyBy().process(new > MyCoProcessFunction); > > stream02 = source02.connect(sideoutput01).keyBy().process(new > MyCoProcessFunction); > > stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print(); > > > > I am confused if I have to use > env.getConfig().setAutoWatermarkInterval(60 seconds), or if I have to add > .assignTimestampsAndWatermarks(new MyAssignerWithPeriodicWatermarks()) and > write the logic on the method getCurrentWatermark(). In my case that I want > a watermark every 60 seconds, I guess this method (getCurrentWatermark()) > should have "return new Watermark(System.currentTimeMillis() + 60000);". > but it should be - or +. > > > > Then, on the CoProcessFunction what is the time that I should pass on > context.timerService().registerEventTimeTimer() and what is the logic that > I should use in the onTimer() method? > > > > [1] > https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47 > > > > Thanks, > > Felipe > > -- > > -- Felipe Gutierrez > > -- skype: felipe.o.gutierrez > > -- https://felipeogutierrez.blogspot.com >