Hi Kostas, Yes, I have a flag in my timestampextractor.
As you can see from the code below, I am checking whether currentTime - systemTimeSinceLastModification > 10 sec..... as new events come then the watermark wouldn't be incremented. But as soon as I have a difference of more than 10 seconds, I am incrementing the watermark by 1 sec, I feel this is very small and I would try incrementing the watermark with a higher value but yeah this is what I am doing. public class TimestampAndWatermarkGenerator implements AssignerWithPeriodicWatermarks<BAMEvent>{ private final long maxOutOfOrderness = 10000; // 10 seconds private long currentMaxTimestamp; private long systemTimeSinceLastModification; private boolean firstEventFlag = false; private Logger log = LoggerFactory.getLogger(TimestampAndWatermarkGenerator.class); @Nullable @Override public Watermark getCurrentWatermark() { long currentTime = System.currentTimeMillis(); if(firstEventFlag && (currentTime - systemTimeSinceLastModification > 10000)){ systemTimeSinceLastModification = currentTime; currentMaxTimestamp = currentMaxTimestamp + 1000; //log.info("Current Max Time - {}, Last Modification Time - {}", currentMaxTimestamp, systemTimeSinceLastModification ); } return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } @Override public long extractTimestamp(BAMEvent bamEvent, long l) { long timestamp = bamEvent.getTimestamp(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); systemTimeSinceLastModification = System.currentTimeMillis(); firstEventFlag = true; //log.info("Current Max Time - {}, Current Event Time - {}", currentMaxTimestamp, systemTimeSinceLastModification); return timestamp; } } -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-not-emitting-timed-out-events-properly-tp13794p13860.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.