Hi, I have created a TimestampAssigner as follows. I want to use monitoring.getEventTimestamp() with an Event Time processing and collected aggregated stats over time window intervals of 5 secs, 5 mins etc. Is this the right way to create the TimeWaterMarkAssigner with a bound ? I want to collect the stats for each eventTimestamp + window intervals. My understanding - *the generated watermark which is eventTimestamp + bound will collect all the eventTimestamp's which arrive within that Watermark inside each eventTimestamp + 5s etc window interval. Or does this bound have to be based on the windowInterval i.e extractedTimestamp + windowInterval + bound *??
> *public class MonitoringTSWAssigner implements > AssignerWithPunctuatedWatermarks<Monitoring> {* > * private long bound = 5 * (long) 1000; * > * public long extractTimestamp(Monitoring monitoring, long previousTS) {* > * return monitoring.getEventTimestamp();** }* > > * public Watermark checkAndGetNextWatermark(Monitoring monitoring, long > extractedTimestamp) {* > * return new Watermark(extractedTimestamp + bound);//<==== should > it be - bound ?* > * }**}* Used here: > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > final DataStreamSource<Monitoring> monitoringDataStreamSource = > env.addSource(....); > DataStream<Monitoring> kinesisStream = > monitoringDataStreamSource.assignTimestampsAndWatermarks(new > MonitoringTSWAssigner()); > KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = > kinesisStream.keyBy("deployment", .....); > final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream = > monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5 > sec time window TIA,