
I have a streaming (event time) application where I am receiving events
with the same assigned timestamp. I receive 10000 events in total on a
window of 5 minutes, but I emit water mark when 9000 elements have been
received. This watermark is 6 minutes after the assigned timestamps. My
question is: why the function that is associated with the window reads
10000 elements and not 9000? All elements that have a timestamp lower than
the watermark should be ignored (1000), but it's not happening.

Here is part of the code:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val rawStream = env.socketTextStream("localhost", 4321)

val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, Int,
Long)] {
      val timestamp = System.currentTimeMillis();

      override def extractTimestamp(element: (String, Int, Long),
previousElementTimestamp: Long): Long =

      override def checkAndGetNextWatermark(lastElement: (String, Int,
Long), extractedTimestamp: Long): Watermark = {
        if(lastElement._3 == 9000) {
          val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
          new watermark.Watermark(ts)
        } else null

val stream = rawStream.map(line => {
      val Array(p1, p2, p3) = line.split(" ")
      (p1, p2.toInt, p3.toLong)

stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)


