In case this helps, this is a Scala helper I am using to filter out late data
on a KeyedStream. The last timestamp state is maintained at the key-level.

```
  implicit class StrictlyAscendingByTime[T, K](stream: KeyedStream[T, K]) {

    def filterStrictlyAscendingTime(timestampExtractor: T =>
Long)(outOfOrderHandler: (T, Long) => Unit): DataStream[T] = {
      stream.filterWithState((currentElement: T, prevElementTimestamp:
Option[Long]) => {
        val currentElementTimestamp = timestampExtractor(currentElement)
        prevElementTimestamp match {
          case None =>
            (true, Some(currentElementTimestamp))
          case Some(t) =>
            if (currentElementTimestamp > t) {
              (true, Some(currentElementTimestamp))
            } else {
              outOfOrderHandler(currentElement, t)
              (false, Some(t))
            }
        }
      })
    }

    def ignoreLateArrivals(timestampExtractor: T => Long): DataStream[T] = {
      stream.filterStrictlyAscendingTime(timestampExtractor) {
        (element, timestamp) => {
          // FLINK-2870 should provide a more idiomatic way to ignore late
arrivals
        }
      }
    }

  }
```



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-watermarks-and-late-data-tp5239p5291.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to