Hi, 1. From doc[1], A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark). So I think it might be counterintuitive that generating a watermark, which is bigger than the timestamp of current element. At least you should minus the bound. 2. From the definition of watermark I think that watermark is not related with the length of window. The bound is related to your application. 3. In your case AssignerWithPunctuatedWatermarks might not be a good choice. Watermark is not free, you might send too many watermarks. If your source could generate some "watermark" element I think you could use the interface. You could choose AssignerWithPeriodicWatermarks. You can find the example from doc[2].
1. https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks 2. https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators Best, Guowei Vijay Balakrishnan <bvija...@gmail.com> 于2019年4月10日周三 上午7:41写道: > Hi, > I have created a TimestampAssigner as follows. > I want to use monitoring.getEventTimestamp() with an Event Time processing > and collected aggregated stats over time window intervals of 5 secs, 5 mins > etc. Is this the right way to create the TimeWaterMarkAssigner with a bound > ? I want to collect the stats for each eventTimestamp + window intervals. > My understanding - *the generated watermark which is eventTimestamp + > bound will collect all the eventTimestamp's which arrive within that > Watermark inside each eventTimestamp + 5s etc window interval. Or does this > bound have to be based on the windowInterval i.e extractedTimestamp + > windowInterval + bound *?? > > >> *public class MonitoringTSWAssigner implements >> AssignerWithPunctuatedWatermarks<Monitoring> {* >> * private long bound = 5 * (long) 1000; * >> * public long extractTimestamp(Monitoring monitoring, long previousTS) {* >> * return monitoring.getEventTimestamp();** }* >> >> * public Watermark checkAndGetNextWatermark(Monitoring monitoring, >> long extractedTimestamp) {* >> * return new Watermark(extractedTimestamp + bound);//<==== should >> it be - bound ?* >> * }**}* > > > Used here: > >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> final DataStreamSource<Monitoring> monitoringDataStreamSource = >> env.addSource(....); >> DataStream<Monitoring> kinesisStream = >> monitoringDataStreamSource.assignTimestampsAndWatermarks(new >> MonitoringTSWAssigner()); >> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = >> kinesisStream.keyBy("deployment", .....); >> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream = >> >> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 sec time window > > > TIA, >