In case this helps, this is a Scala helper I am using to filter out late data on a KeyedStream. The last timestamp state is maintained at the key-level.
``` implicit class StrictlyAscendingByTime[T, K](stream: KeyedStream[T, K]) { def filterStrictlyAscendingTime(timestampExtractor: T => Long)(outOfOrderHandler: (T, Long) => Unit): DataStream[T] = { stream.filterWithState((currentElement: T, prevElementTimestamp: Option[Long]) => { val currentElementTimestamp = timestampExtractor(currentElement) prevElementTimestamp match { case None => (true, Some(currentElementTimestamp)) case Some(t) => if (currentElementTimestamp > t) { (true, Some(currentElementTimestamp)) } else { outOfOrderHandler(currentElement, t) (false, Some(t)) } } }) } def ignoreLateArrivals(timestampExtractor: T => Long): DataStream[T] = { stream.filterStrictlyAscendingTime(timestampExtractor) { (element, timestamp) => { // FLINK-2870 should provide a more idiomatic way to ignore late arrivals } } } } ``` -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-watermarks-and-late-data-tp5239p5291.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.