Aljoscha, thank you very much for explanation. It seems that using /AscendingTimestampExtractor/ would really solve my problem, because reading a watermark with "currentTimestamp - 1" is correct way to wait for all elements with identical timestamp.
But I can see this is not true for /BoundedOutOfOrdernessTimestampExtractor/where the watermark is used as is without "-1". public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if(potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } I think those two implementation should use the same principle. Aljoscha Krettek-2 wrote > I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about > watermarks: > > "A Watermark tells operators that receive it that no elements with a > timestamp older or equal to the watermark timestamp should arrive at the > operator." > > The system also relies on this fact, as visible in how timers are read > from > the watermark timers queue and in AscendingTimestampExtractor, which has > this code: > > public final Watermark getCurrentWatermark() { > return new Watermark(currentTimestamp == Long.MIN_VALUE ? > Long.MIN_VALUE : currentTimestamp - 1); > } > > Notice, how the watermark is "currentTimestamp - 1" where current > timestamp > is the highest seen timestamp so far and where we assume monotonically > ascending timestamps. > > Cheers, > Aljoscha > > On Tue, 20 Dec 2016 at 15:28 Fabian Hueske < > fhueske@ > > wrote: > >> Hi Jaromir, >> >> thank you very much for reporting this issue. >> The behavior you are describing is not in line with the documentation of >> watermarks [1] which clearly states that a watermark of time t tells the >> system that no more events with a timestamp < t will occur (otherwise >> they >> would be considered as late events). Hence, events with a timestamp = t >> as >> in your case should be OK and not be considered late. >> >> I think this is not intended and probably a bug. >> >> I'll loop in some contributors who are more familiar with watermarks and >> event-time (cc Aljoscha, Kostas K, Stephan). >> >> Best, Fabian >> >> [1] >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks >> >> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek < > vanek.jaromir@ > >: >> >> > Hi, >> > >> > I am using Flink 1.1.3 and following example doesn't work for me as >> > expected. >> > >> > I've got three input elements with similar timestamp (equaling to >> window >> > maxTimestamp). I'm using /event time/ notion of time with >> > /TumblingEventTimeWindows/. >> > >> > I would expect all three elements to be processed in the same window, >> > because they all have the identical event time timestamp. But the >> result >> > I'm >> > getting is just the first element that triggers the window. The rest of >> > elements are considered as late-comers and discarded. >> > >> > From my point of view this is definitely not correct and should be >> fixed. >> > Could you clarify if this is correct behavior or bug? >> > >> > I think the problem is in /WindowOperator#processWatermark/. Timer >> should >> > be >> > fired if and only if the current watermark is strictly larger than >> > registered timer. >> > >> > / >> > Timer<K, W> timer = watermarkTimersQueue.peek(); >> > if (timer != null && timer.timestamp <= mark.getTimestamp()) { >> > / >> > >> > Thanks >> > Jaromir Vanek >> > >> > / >> > public class WindowingTest { >> > >> > public static void main(String[] args) throws Exception { >> > StreamExecutionEnvironment env = >> > StreamExecutionEnvironment.createLocalEnvironment(); >> > >> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> > >> > List<Tuple2&lt;Instant, Integer>> elements = >> Arrays.asList( >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), >> 100), >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), >> 200), >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), >> 300) >> > ); >> > >> > DataStreamSource<Tuple2&lt;Instant, Integer>> input = >> > env.fromCollection(elements); >> > >> > SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> >> timestamped = >> > input.assignTimestampsAndWatermarks(new >> PunctuatedAssigner()); >> > >> > timestamped.timeWindowAll(Time.minutes(1)) >> > .sum(1) >> > .print(); >> > >> > // printed result >> > // (2016-12-19T10:59:59.999Z,100) >> > >> > env.execute(); >> > } >> > >> > private static class PunctuatedAssigner >> > implements >> AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant, > > > Integer>> { >> > >> > @Override >> > public long extractTimestamp(Tuple2<Instant, Integer> >> element, long >> > previousElementTimestamp) { >> > return element.f0.toEpochMilli(); >> > } >> > >> > @Override >> > public Watermark checkAndGetNextWatermark(Tuple2<Instant, >> Integer> >> > lastElement, long extractedTimestamp) { >> > return new Watermark(extractedTimestamp); >> > } >> > } >> > } >> > / >> > >> > >> > >> > -- >> > View this message in context: http://apache-flink-mailing- >> > list-archive.1008284.n3.nabble.com/Flink-gives- >> > incorrect-result-when-event-time-windowing-used-tp15058.html >> > Sent from the Apache Flink Mailing List archive. mailing list archive >> at >> > Nabble.com. >> > >> -- View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-gives-incorrect-result-when-event-time-windowing-used-tp15058p15093.html Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.