Thanks for the clarification Aljoscha. I added https://issues.apache.org/jira/browse/FLINK-5375 to fix this issue.
Best, Fabian 2016-12-20 17:58 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about > watermarks: > > "A Watermark tells operators that receive it that no elements with a > timestamp older or equal to the watermark timestamp should arrive at the > operator." > > The system also relies on this fact, as visible in how timers are read from > the watermark timers queue and in AscendingTimestampExtractor, which has > this code: > > public final Watermark getCurrentWatermark() { > return new Watermark(currentTimestamp == Long.MIN_VALUE ? > Long.MIN_VALUE : currentTimestamp - 1); > } > > Notice, how the watermark is "currentTimestamp - 1" where current timestamp > is the highest seen timestamp so far and where we assume monotonically > ascending timestamps. > > Cheers, > Aljoscha > > On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <fhue...@gmail.com> wrote: > > > Hi Jaromir, > > > > thank you very much for reporting this issue. > > The behavior you are describing is not in line with the documentation of > > watermarks [1] which clearly states that a watermark of time t tells the > > system that no more events with a timestamp < t will occur (otherwise > they > > would be considered as late events). Hence, events with a timestamp = t > as > > in your case should be OK and not be considered late. > > > > I think this is not intended and probably a bug. > > > > I'll loop in some contributors who are more familiar with watermarks and > > event-time (cc Aljoscha, Kostas K, Stephan). > > > > Best, Fabian > > > > [1] > > > > https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/event_time.html#event-time-and-watermarks > > > > 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <vanek.jaro...@gmail.com>: > > > > > Hi, > > > > > > I am using Flink 1.1.3 and following example doesn't work for me as > > > expected. > > > > > > I've got three input elements with similar timestamp (equaling to > window > > > maxTimestamp). I'm using /event time/ notion of time with > > > /TumblingEventTimeWindows/. > > > > > > I would expect all three elements to be processed in the same window, > > > because they all have the identical event time timestamp. But the > result > > > I'm > > > getting is just the first element that triggers the window. The rest of > > > elements are considered as late-comers and discarded. > > > > > > From my point of view this is definitely not correct and should be > fixed. > > > Could you clarify if this is correct behavior or bug? > > > > > > I think the problem is in /WindowOperator#processWatermark/. Timer > should > > > be > > > fired if and only if the current watermark is strictly larger than > > > registered timer. > > > > > > / > > > Timer<K, W> timer = watermarkTimersQueue.peek(); > > > if (timer != null && timer.timestamp <= mark.getTimestamp()) { > > > / > > > > > > Thanks > > > Jaromir Vanek > > > > > > / > > > public class WindowingTest { > > > > > > public static void main(String[] args) throws Exception { > > > StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.createLocalEnvironment(); > > > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > > > List<Tuple2<Instant, Integer>> elements = Arrays.asList( > > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > 100), > > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > 200), > > > new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), > 300) > > > ); > > > > > > DataStreamSource<Tuple2<Instant, Integer>> input = > > > env.fromCollection(elements); > > > > > > SingleOutputStreamOperator<Tuple2<Instant, Integer>> > timestamped = > > > input.assignTimestampsAndWatermarks(new > > PunctuatedAssigner()); > > > > > > timestamped.timeWindowAll(Time.minutes(1)) > > > .sum(1) > > > .print(); > > > > > > // printed result > > > // (2016-12-19T10:59:59.999Z,100) > > > > > > env.execute(); > > > } > > > > > > private static class PunctuatedAssigner > > > implements AssignerWithPunctuatedWatermar > ks<Tuple2<Instant, > > > Integer>> { > > > > > > @Override > > > public long extractTimestamp(Tuple2<Instant, Integer> element, > long > > > previousElementTimestamp) { > > > return element.f0.toEpochMilli(); > > > } > > > > > > @Override > > > public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer> > > > lastElement, long extractedTimestamp) { > > > return new Watermark(extractedTimestamp); > > > } > > > } > > > } > > > / > > > > > > > > > > > > -- > > > View this message in context: http://apache-flink-mailing- > > > list-archive.1008284.n3.nabble.com/Flink-gives- > > > incorrect-result-when-event-time-windowing-used-tp15058.html > > > Sent from the Apache Flink Mailing List archive. mailing list archive > at > > > Nabble.com. > > > > > >