Yes, it would seem that the bounded-out-of-orderness extractor has an off-by-one error. We should fix it. In most practical cases these errors should not change results by much, however (IMHO).
Cheers, Aljoscha On Wed, 21 Dec 2016 at 22:43 Jaromir Vanek <vanek.jaro...@gmail.com> wrote: > Aljoscha, thank you very much for explanation. > > It seems that using /AscendingTimestampExtractor/ would really solve my > problem, because reading a watermark with "currentTimestamp - 1" is correct > way to wait for all elements with identical timestamp. > > But I can see this is not true for > /BoundedOutOfOrdernessTimestampExtractor/where the watermark is used as is > without "-1". > > public final Watermark getCurrentWatermark() { > // this guarantees that the watermark never goes backwards. > long potentialWM = currentMaxTimestamp - maxOutOfOrderness; > if(potentialWM >= lastEmittedWatermark) { > lastEmittedWatermark = potentialWM; > } > return new Watermark(lastEmittedWatermark); > } > > I think those two implementation should use the same principle. > > > Aljoscha Krettek-2 wrote > > I'm afraid the doc is wrong here. The JavaDoc on Watermark says this > about > > watermarks: > > > > "A Watermark tells operators that receive it that no elements with a > > timestamp older or equal to the watermark timestamp should arrive at the > > operator." > > > > The system also relies on this fact, as visible in how timers are read > > from > > the watermark timers queue and in AscendingTimestampExtractor, which has > > this code: > > > > public final Watermark getCurrentWatermark() { > > return new Watermark(currentTimestamp == Long.MIN_VALUE ? > > Long.MIN_VALUE : currentTimestamp - 1); > > } > > > > Notice, how the watermark is "currentTimestamp - 1" where current > > timestamp > > is the highest seen timestamp so far and where we assume monotonically > > ascending timestamps. > > > > Cheers, > > Aljoscha > > > > On Tue, 20 Dec 2016 at 15:28 Fabian Hueske < > > > fhueske@ > > > > wrote: > > > >> Hi Jaromir, > >> > >> thank you very much for reporting this issue. > >> The behavior you are describing is not in line with the documentation of > >> watermarks [1] which clearly states that a watermark of time t tells the > >> system that no more events with a timestamp < t will occur (otherwise > >> they > >> would be considered as late events). Hence, events with a timestamp = t > >> as > >> in your case should be OK and not be considered late. > >> > >> I think this is not intended and probably a bug. > >> > >> I'll loop in some contributors who are more familiar with watermarks and > >> event-time (cc Aljoscha, Kostas K, Stephan). > >> > >> Best, Fabian > >> > >> [1] > >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks > >> > >> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek < > > > vanek.jaromir@ > > > >: > >> > >> > Hi, > >> > > >> > I am using Flink 1.1.3 and following example doesn't work for me as > >> > expected. > >> > > >> > I've got three input elements with similar timestamp (equaling to > >> window > >> > maxTimestamp). I'm using /event time/ notion of time with > >> > /TumblingEventTimeWindows/. > >> > > >> > I would expect all three elements to be processed in the same window, > >> > because they all have the identical event time timestamp. But the > >> result > >> > I'm > >> > getting is just the first element that triggers the window. The rest > of > >> > elements are considered as late-comers and discarded. > >> > > >> > From my point of view this is definitely not correct and should be > >> fixed. > >> > Could you clarify if this is correct behavior or bug? > >> > > >> > I think the problem is in /WindowOperator#processWatermark/. Timer > >> should > >> > be > >> > fired if and only if the current watermark is strictly larger than > >> > registered timer. > >> > > >> > / > >> > Timer<K, W> timer = watermarkTimersQueue.peek(); > >> > if (timer != null && timer.timestamp <= mark.getTimestamp()) { > >> > / > >> > > >> > Thanks > >> > Jaromir Vanek > >> > > >> > / > >> > public class WindowingTest { > >> > > >> > public static void main(String[] args) throws Exception { > >> > StreamExecutionEnvironment env = > >> > StreamExecutionEnvironment.createLocalEnvironment(); > >> > > >> > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >> > > >> > List<Tuple2&lt;Instant, Integer>> elements = > >> Arrays.asList( > >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > >> 100), > >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > >> 200), > >> > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > >> 300) > >> > ); > >> > > >> > DataStreamSource<Tuple2&lt;Instant, Integer>> input = > >> > env.fromCollection(elements); > >> > > >> > SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> > >> timestamped = > >> > input.assignTimestampsAndWatermarks(new > >> PunctuatedAssigner()); > >> > > >> > timestamped.timeWindowAll(Time.minutes(1)) > >> > .sum(1) > >> > .print(); > >> > > >> > // printed result > >> > // (2016-12-19T10:59:59.999Z,100) > >> > > >> > env.execute(); > >> > } > >> > > >> > private static class PunctuatedAssigner > >> > implements > >> AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant, > > > > Integer>> { > >> > > >> > @Override > >> > public long extractTimestamp(Tuple2<Instant, Integer> > >> element, long > >> > previousElementTimestamp) { > >> > return element.f0.toEpochMilli(); > >> > } > >> > > >> > @Override > >> > public Watermark checkAndGetNextWatermark(Tuple2<Instant, > >> Integer> > >> > lastElement, long extractedTimestamp) { > >> > return new Watermark(extractedTimestamp); > >> > } > >> > } > >> > } > >> > / > >> > > >> > > >> > > >> > -- > >> > View this message in context: http://apache-flink-mailing- > >> > list-archive.1008284.n3.nabble.com/Flink-gives- > >> > incorrect-result-when-event-time-windowing-used-tp15058.html > >> > Sent from the Apache Flink Mailing List archive. mailing list archive > >> at > >> > Nabble.com. > >> > > >> > > > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-gives-incorrect-result-when-event-time-windowing-used-tp15058p15093.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. >