Hi, Observations on Watermarks: Read this great article: https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy
* Watermark means when for any event TS, when to stop waiting for arrival of earlier events. * Watermark t means all events with Timestamp < t have already arrived. * When to push data out - When watermark with TS >= t arrives Only *using incrementing current time for watermark seems to be working correctly* but not sure if it aligns up correctly with EventTime processing. *Using the incoming records intervalStart as the Watermark source for EventTime causes data to not be pushed at all* in cases when i have just 5 records in the Source. My source generation for intervalStart has intervalStart incrementing at a regular interval. I tried using the intervalStart for my Watermark with a out of order late boundedness of 3 secs. The *AggregateFunction* I am using calls the add() fine but *never calls the getResult().* My assumption was that the AggregateFunction I am using would push the data to getResult based on the Watermark based on intervalStart incrementing beyong the previous watermark t. But it doesn't -is it because I have limited number of input records and once intervalStart gets to the end of the input records too fast, it stops incrementing the watermar and hence doesn't push data ? With System.currentTimeMillis, it happily keeps increasing and hence pushes the data. Created this class: public class MonitoringAssigner implements AssignerWithPunctuatedWatermarks<Monitoring> { private long bound = 3 * 1000;//3 secs out of order bound in millisecs public MonitoringAssigner(long bound) { this.bound = bound; } public Watermark checkAndGetNextWatermark(Monitoring monitoring, long extractedTimestamp) { long nextWatermark = extractedTimestamp - bound; //simply emit a Watermark with every event return new Watermark(nextWatermark); } @Override public long extractTimestamp(Monitoring monitoring, long previousTS) { /*LocalDateTime intervalStart = Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 02:21:06.057 long extractedTS = Utils.getLongFromLocalDateTime(intervalStart);//*using this stopped pushing recs after a certain time* return extractedTS;*/ return *System.currentTimeMillis*();//incrementing current time }