thanks for the detail explanation! I removed my implementation of the
watermark which is not necessary in my case. I will only use Watermarkers
if I am dealing with out of order events.

*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Wed, Aug 21, 2019 at 9:09 PM David Anderson <da...@ververica.com> wrote:

> What Watermarks do is to advance the event time clock. You can
> consider a Watermark(t) as an assertion about the completeness of the
> stream -- it marks a point in the stream and says that at that point,
> the stream is (probably) now complete up to time t.
>
> The autoWatermarkInterval determines how often new Watermarks are
> created -- or in other words, how often the event-time clock will be
> able to move forward. From what you've presented, it seems like you
> can leave this at its default, which is 200 msec. This means that five
> times a second, as your application runs, each parallel instance will
> create a new watermark (assuming there's been new data and that the
> event time clock can be advanced).
>
> getCurrentWatermark() should NOT be implemented in terms of
> System.currentTimeMillis -- you do not want your watermarking to
> depend on the current processing time if you can possibly avoid it.
> Part of the beauty of event time processing is being able to run your
> application on historic data as well as live, real-time data, and this
> is only possible if your watermarks depend on timestamps recorded in
> the events, rather than System.currentTimeMillis.
>
> You should also try to decouple your watermarking strategy from the
> specific processing you intend to later, downstream. The primary
> concern you need to have when implementing the watermarking is to
> consider how much out-of-orderness your data may have. A typical
> timestamp assigner and watermark generator will look something like
> this, assuming that your event stream will have its timestamps at most
> 10 seconds out of order, and that your events have a timestamp field:
>
> DataStream<MyEvent> withTimestampsAndWatermarks =
>     stream.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
>
>         @Override
>         public long extractTimestamp(MyEvent element) {
>             return element.timestamp;
>         }
> });
>
> As for your specific application requirements, you might find it
> simpler to rely on State Time-to-Live [1] rather than clearing state
> yourself.
>
> There's no need to retain the state until the windowed join is
> completed, since the operator executing the join can't access the
> state in the CoProcessFunction. The CoProcessFunction should clear the
> state whenever it is done with it; no other part of your job will
> access it.
>
> If there is a risk that the CoProcessFunction will create state that
> isn't freed, and you don't for some reason find State TTL a good
> solution for this, then you can use either a processing time or event
> time timer to trigger a call to onTimer in which you can free the
> state. For example,
>
> timerService.registerEventTimeTimer(event.getEventTime() + 60 * 1000);
>
> registers an event time timer for 60 seconds after the timestamp in an
> event -- meaning, take the event's timestamp, add 60 seconds, and wait
> until the current Watermark has surpassed that point in time.
>
> The Flink training website has tutorials [2] and exercises [3] on these
> topics.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl
> [2] https://training.ververica.com/lessons/event-time-watermarks.html
> [3]
> https://training.ververica.com/exercises/rideEnrichment-processfunction.html
>
>
> On Wed, Aug 21, 2019 at 10:59 AM Felipe Gutierrez
> <felipe.o.gutier...@gmail.com> wrote:
> >
> > Hi,
> >
> > I am a little confused about watermarkers in Flink.
> >
> > My application is using EventTime. My sources are calling
> ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a
> CoProcessFunction which merge the two streams. I have a state on this
> function and I want to clean this state every time that I trigger the
> window of my next operator. The next operator is a join which is using a
> window of 1 minute [1].
> >
> > stream01 = source01.connect(sideoutput02).keyBy().process(new
> MyCoProcessFunction);
> > stream02 = source02.connect(sideoutput01).keyBy().process(new
> MyCoProcessFunction);
> > stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print();
> >
> > I am confused if I have to use
> env.getConfig().setAutoWatermarkInterval(60 seconds), or if I have to add
> .assignTimestampsAndWatermarks(new MyAssignerWithPeriodicWatermarks()) and
> write the logic on the method getCurrentWatermark(). In my case that I want
> a watermark every 60 seconds, I guess this method (getCurrentWatermark())
> should have "return new Watermark(System.currentTimeMillis() + 60000);".
> but it should be - or +.
> >
> > Then, on the CoProcessFunction what is the time that I should pass on
> context.timerService().registerEventTimeTimer() and what is the logic that
> I should use in the onTimer() method?
> >
> > [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47
> >
> > Thanks,
> > Felipe
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
>

Reply via email to