Hi,
I have created a TimestampAssigner as follows.
I want to use monitoring.getEventTimestamp() with an Event Time processing
and collected aggregated stats over time window intervals of 5 secs, 5 mins
etc. Is this the right way to create the TimeWaterMarkAssigner with a bound
? I want to collect the stats for each eventTimestamp + window intervals.
My understanding - *the generated watermark which is eventTimestamp + bound
will collect all the eventTimestamp's which arrive within that Watermark
inside each eventTimestamp + 5s etc window interval. Or does this bound
have to be based on the windowInterval i.e extractedTimestamp +
windowInterval + bound *??
> *public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks<Monitoring> {*
> * private long bound = 5 * (long) 1000; *
> * public long extractTimestamp(Monitoring monitoring, long previousTS) {*
> * return monitoring.getEventTimestamp();** }*
>
> * public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
> extractedTimestamp) {*
> * return new Watermark(extractedTimestamp + bound);//<==== should
> it be - bound ?*
> * }**}*
Used here:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> final DataStreamSource<Monitoring> monitoringDataStreamSource =
> env.addSource(....);
> DataStream<Monitoring> kinesisStream =
> monitoringDataStreamSource.assignTimestampsAndWatermarks(new
> MonitoringTSWAssigner());
> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream =
> kinesisStream.keyBy("deployment", .....);
> final WindowedStream<Monitoring, Tuple, TimeWindow> windowStream =
> monitoringTupleKeyedStream.timeWindow(Time.seconds(5));//5
> sec time window
TIA,