Hi Jaromir,

thank you very much for reporting this issue.
The behavior you are describing is not in line with the documentation of
watermarks [1] which clearly states that a watermark of time t tells the
system that no more events with a timestamp < t will occur (otherwise they
would be considered as late events). Hence, events with a timestamp = t as
in your case should be OK and not be considered late.

I think this is not intended and probably a bug.

I'll loop in some contributors who are more familiar with watermarks and
event-time (cc Aljoscha, Kostas K, Stephan).

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks

2016-12-20 14:56 GMT+01:00 Jaromir Vanek <vanek.jaro...@gmail.com>:

> Hi,
>
> I am using Flink 1.1.3 and following example doesn't work for me as
> expected.
>
> I've got three input elements with similar timestamp (equaling to window
> maxTimestamp). I'm using /event time/ notion of time with
> /TumblingEventTimeWindows/.
>
> I would expect all three elements to be processed in the same window,
> because they all have the identical event time timestamp. But the result
> I'm
> getting is just the first element that triggers the window. The rest of
> elements are considered as late-comers and discarded.
>
> From my point of view this is definitely not correct and should be fixed.
> Could you clarify if this is correct behavior or bug?
>
> I think the problem is in /WindowOperator#processWatermark/. Timer should
> be
> fired if and only if the current watermark is strictly larger than
> registered timer.
>
> /
> Timer<K, W> timer = watermarkTimersQueue.peek();
> if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> /
>
> Thanks
> Jaromir Vanek
>
> /
> public class WindowingTest {
>
>   public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
>             StreamExecutionEnvironment.createLocalEnvironment();
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>     List<Tuple2&lt;Instant, Integer>> elements = Arrays.asList(
>             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100),
>             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200),
>             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300)
>     );
>
>     DataStreamSource<Tuple2&lt;Instant, Integer>> input =
> env.fromCollection(elements);
>
>     SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> timestamped =
>             input.assignTimestampsAndWatermarks(new PunctuatedAssigner());
>
>     timestamped.timeWindowAll(Time.minutes(1))
>          .sum(1)
>          .print();
>
>     // printed result
>     // (2016-12-19T10:59:59.999Z,100)
>
>     env.execute();
>   }
>
>   private static class PunctuatedAssigner
>           implements AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant,
> Integer>> {
>
>     @Override
>     public long extractTimestamp(Tuple2<Instant, Integer> element, long
> previousElementTimestamp) {
>       return element.f0.toEpochMilli();
>     }
>
>     @Override
>     public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer>
> lastElement, long extractedTimestamp) {
>       return new Watermark(extractedTimestamp);
>     }
>   }
> }
> /
>
>
>
> --
> View this message in context: http://apache-flink-mailing-
> list-archive.1008284.n3.nabble.com/Flink-gives-
> incorrect-result-when-event-time-windowing-used-tp15058.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>

Reply via email to