Hi Jaromir, thank you very much for reporting this issue. The behavior you are describing is not in line with the documentation of watermarks [1] which clearly states that a watermark of time t tells the system that no more events with a timestamp < t will occur (otherwise they would be considered as late events). Hence, events with a timestamp = t as in your case should be OK and not be considered late.
I think this is not intended and probably a bug. I'll loop in some contributors who are more familiar with watermarks and event-time (cc Aljoscha, Kostas K, Stephan). Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <vanek.jaro...@gmail.com>: > Hi, > > I am using Flink 1.1.3 and following example doesn't work for me as > expected. > > I've got three input elements with similar timestamp (equaling to window > maxTimestamp). I'm using /event time/ notion of time with > /TumblingEventTimeWindows/. > > I would expect all three elements to be processed in the same window, > because they all have the identical event time timestamp. But the result > I'm > getting is just the first element that triggers the window. The rest of > elements are considered as late-comers and discarded. > > From my point of view this is definitely not correct and should be fixed. > Could you clarify if this is correct behavior or bug? > > I think the problem is in /WindowOperator#processWatermark/. Timer should > be > fired if and only if the current watermark is strictly larger than > registered timer. > > / > Timer<K, W> timer = watermarkTimersQueue.peek(); > if (timer != null && timer.timestamp <= mark.getTimestamp()) { > / > > Thanks > Jaromir Vanek > > / > public class WindowingTest { > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > List<Tuple2<Instant, Integer>> elements = Arrays.asList( > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100), > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200), > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300) > ); > > DataStreamSource<Tuple2<Instant, Integer>> input = > env.fromCollection(elements); > > SingleOutputStreamOperator<Tuple2<Instant, Integer>> timestamped = > input.assignTimestampsAndWatermarks(new PunctuatedAssigner()); > > timestamped.timeWindowAll(Time.minutes(1)) > .sum(1) > .print(); > > // printed result > // (2016-12-19T10:59:59.999Z,100) > > env.execute(); > } > > private static class PunctuatedAssigner > implements AssignerWithPunctuatedWatermarks<Tuple2<Instant, > Integer>> { > > @Override > public long extractTimestamp(Tuple2<Instant, Integer> element, long > previousElementTimestamp) { > return element.f0.toEpochMilli(); > } > > @Override > public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer> > lastElement, long extractedTimestamp) { > return new Watermark(extractedTimestamp); > } > } > } > / > > > > -- > View this message in context: http://apache-flink-mailing- > list-archive.1008284.n3.nabble.com/Flink-gives- > incorrect-result-when-event-time-windowing-used-tp15058.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. >