Hi, I am using Flink 1.1.3 and following example doesn't work for me as expected.
I've got three input elements with similar timestamp (equaling to window maxTimestamp). I'm using /event time/ notion of time with /TumblingEventTimeWindows/. I would expect all three elements to be processed in the same window, because they all have the identical event time timestamp. But the result I'm getting is just the first element that triggers the window. The rest of elements are considered as late-comers and discarded. >From my point of view this is definitely not correct and should be fixed. Could you clarify if this is correct behavior or bug? I think the problem is in /WindowOperator#processWatermark/. Timer should be fired if and only if the current watermark is strictly larger than registered timer. / Timer<K, W> timer = watermarkTimersQueue.peek(); if (timer != null && timer.timestamp <= mark.getTimestamp()) { / Thanks Jaromir Vanek / public class WindowingTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); List<Tuple2<Instant, Integer>> elements = Arrays.asList( new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100), new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200), new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300) ); DataStreamSource<Tuple2<Instant, Integer>> input = env.fromCollection(elements); SingleOutputStreamOperator<Tuple2<Instant, Integer>> timestamped = input.assignTimestampsAndWatermarks(new PunctuatedAssigner()); timestamped.timeWindowAll(Time.minutes(1)) .sum(1) .print(); // printed result // (2016-12-19T10:59:59.999Z,100) env.execute(); } private static class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<Tuple2<Instant, Integer>> { @Override public long extractTimestamp(Tuple2<Instant, Integer> element, long previousElementTimestamp) { return element.f0.toEpochMilli(); } @Override public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer> lastElement, long extractedTimestamp) { return new Watermark(extractedTimestamp); } } } / -- View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-gives-incorrect-result-when-event-time-windowing-used-tp15058.html Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.