A while back I wrote this slightly more elaborate extractor that will advance the watermark independently after the stream is idle for a while: https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java <https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java>
Best, Aljoscha > On 16. Jan 2018, at 10:29, Fabian Hueske <fhue...@gmail.com> wrote: > > This depends on the requirements of your application. > Using the usual watermark generation strategies which are purely data driven, > a stream that does not produce data would not advance its watermarks. > Not advancing the watermarks means that the program cannot make progress. > > This might also be fine if your program consumes a single stream because if > this stream does not produce data, your program also doesn't have anything to > compute (there might be still data left. such as a window, that is not > computed). > The situation becomes more tricky, if your program has multiple sources that > become inactive at some point or a source where a partition can become > inactive. > > AFAIK, there is a mechanism to mark partitions (and maybe complete sources) > as inactive. > @Gordon (in CC) knows more about this feature. > > Best, Fabian > > 2018-01-15 14:51 GMT+01:00 Jayant Ameta <wittyam...@gmail.com > <mailto:wittyam...@gmail.com>>: > Hi Fabian, > I want to extract timestamps from my event. However, the events stream can be > sparse at times (e.g. 2 days without any events). > What's the best strategy to create watermarks if I want real-time processing > of the events which enter the stream? > > Jayant Ameta > > On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske <fhue...@gmail.com > <mailto:fhue...@gmail.com>> wrote: > Another thing to point out is that watermarks are usually data-driven, i.e., > they depend on the timestamps of the events and not on the clock of the > machine. > Otherwise, you might observe a lot of late data, i.e., events with timestamps > smaller than the last watermark. > > If you assign timestamps and watermarks based on the clock of the machine, > you might also use ingestion time instead of event time. > > 2018-01-11 11:49 GMT+01:00 Jayant Ameta <wittyam...@gmail.com > <mailto:wittyam...@gmail.com>>: > Thanks Gary, > I was only trying with a fixed set of events, so the Watermark was not > advancing, like you said. > > > Jayant Ameta > > On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <g...@data-artisans.com > <mailto:g...@data-artisans.com>> wrote: > Hi Jayant, > > The difference is that the Watermarks from > BoundedOutOfOrdernessTimestampExtractor are based on the greatest timestamp of > all previous events. That is, if you do not receive new events, the Watermark > will not advance. In contrast, your custom implementation of > AssignerWithPeriodicWatermarks always advances the Watermark based on the wall > clock. > > Maybe this will already help you to debug your application. If not, it would > be > great to see a minimal working example. > > Best, > Gary > > On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <wittyam...@gmail.com > <mailto:wittyam...@gmail.com>> wrote: > Hi, > When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is not > firing. However, the trigger fires when using custom timestamp extractor with > similar watermark. > > Sample code below: > 1.Assigner as anonymous class which works fine > AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new > AssignerWithPeriodicWatermarks<Tuple2<Rule, T>>() { > @Override > public long extractTimestamp(Tuple2<Rule, T> element, long > previousElementTimestamp) { > return System.currentTimeMillis(); > } > > @Override > public final Watermark getCurrentWatermark() { > // this guarantees that the watermark never goes backwards. > return new Watermark(System.currentTimeMillis()-100); > } > }; > > 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work > AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner = new > BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, > T>>(Time.milliseconds(100)) { > @Override > public long extractTimestamp(Tuple2<Rule, T> element) { > return System.currentTimeMillis(); > } > }; > > Do you see any difference in the approaches? > > - Jayant > > > > >