Hi, I have a streaming (event time) application where I am receiving events with the same assigned timestamp. I receive 10000 events in total on a window of 5 minutes, but I emit water mark when 9000 elements have been received. This watermark is 6 minutes after the assigned timestamps. My question is: why the function that is associated with the window reads 10000 elements and not 9000? All elements that have a timestamp lower than the watermark should be ignored (1000), but it's not happening.
Here is part of the code: « val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val rawStream = env.socketTextStream("localhost", 4321) val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, Int, Long)] { val timestamp = System.currentTimeMillis(); override def extractTimestamp(element: (String, Int, Long), previousElementTimestamp: Long): Long = timestamp override def checkAndGetNextWatermark(lastElement: (String, Int, Long), extractedTimestamp: Long): Watermark = { if(lastElement._3 == 9000) { val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6) new watermark.Watermark(ts) } else null } } val stream = rawStream.map(line => { val Array(p1, p2, p3) = line.split(" ") (p1, p2.toInt, p3.toLong) }) .assignTimestampsAndWatermarks(punctuatedAssigner) stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function) » Thanks!