What Watermarks do is to advance the event time clock. You can
consider a Watermark(t) as an assertion about the completeness of the
stream -- it marks a point in the stream and says that at that point,
the stream is (probably) now complete up to time t.

The autoWatermarkInterval determines how often new Watermarks are
created -- or in other words, how often the event-time clock will be
able to move forward. From what you've presented, it seems like you
can leave this at its default, which is 200 msec. This means that five
times a second, as your application runs, each parallel instance will
create a new watermark (assuming there's been new data and that the
event time clock can be advanced).

getCurrentWatermark() should NOT be implemented in terms of
System.currentTimeMillis -- you do not want your watermarking to
depend on the current processing time if you can possibly avoid it.
Part of the beauty of event time processing is being able to run your
application on historic data as well as live, real-time data, and this
is only possible if your watermarks depend on timestamps recorded in
the events, rather than System.currentTimeMillis.

You should also try to decouple your watermarking strategy from the
specific processing you intend to later, downstream. The primary
concern you need to have when implementing the watermarking is to
consider how much out-of-orderness your data may have. A typical
timestamp assigner and watermark generator will look something like
this, assuming that your event stream will have its timestamps at most
10 seconds out of order, and that your events have a timestamp field:

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

        @Override
        public long extractTimestamp(MyEvent element) {
            return element.timestamp;
        }
});

As for your specific application requirements, you might find it
simpler to rely on State Time-to-Live [1] rather than clearing state
yourself.

There's no need to retain the state until the windowed join is
completed, since the operator executing the join can't access the
state in the CoProcessFunction. The CoProcessFunction should clear the
state whenever it is done with it; no other part of your job will
access it.

If there is a risk that the CoProcessFunction will create state that
isn't freed, and you don't for some reason find State TTL a good
solution for this, then you can use either a processing time or event
time timer to trigger a call to onTimer in which you can free the
state. For example,

timerService.registerEventTimeTimer(event.getEventTime() + 60 * 1000);

registers an event time timer for 60 seconds after the timestamp in an
event -- meaning, take the event's timestamp, add 60 seconds, and wait
until the current Watermark has surpassed that point in time.

The Flink training website has tutorials [2] and exercises [3] on these topics.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl
[2] https://training.ververica.com/lessons/event-time-watermarks.html
[3] https://training.ververica.com/exercises/rideEnrichment-processfunction.html


On Wed, Aug 21, 2019 at 10:59 AM Felipe Gutierrez
<felipe.o.gutier...@gmail.com> wrote:
>
> Hi,
>
> I am a little confused about watermarkers in Flink.
>
> My application is using EventTime. My sources are calling 
> ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a 
> CoProcessFunction which merge the two streams. I have a state on this 
> function and I want to clean this state every time that I trigger the window 
> of my next operator. The next operator is a join which is using a window of 1 
> minute [1].
>
> stream01 = source01.connect(sideoutput02).keyBy().process(new 
> MyCoProcessFunction);
> stream02 = source02.connect(sideoutput01).keyBy().process(new 
> MyCoProcessFunction);
> stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print();
>
> I am confused if I have to use env.getConfig().setAutoWatermarkInterval(60 
> seconds), or if I have to add .assignTimestampsAndWatermarks(new 
> MyAssignerWithPeriodicWatermarks()) and write the logic on the method 
> getCurrentWatermark(). In my case that I want a watermark every 60 seconds, I 
> guess this method (getCurrentWatermark()) should have "return new 
> Watermark(System.currentTimeMillis() + 60000);". but it should be - or +.
>
> Then, on the CoProcessFunction what is the time that I should pass on 
> context.timerService().registerEventTimeTimer() and what is the logic that I 
> should use in the onTimer() method?
>
> [1] 
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com

Reply via email to