Thanks for the clarification Aljoscha.

I added https://issues.apache.org/jira/browse/FLINK-5375 to fix this issue.

Best, Fabian

2016-12-20 17:58 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:

> I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about
> watermarks:
>
> "A Watermark tells operators that receive it that no elements with a
> timestamp older or equal to the watermark timestamp should arrive at the
> operator."
>
> The system also relies on this fact, as visible in how timers are read from
> the watermark timers queue and in AscendingTimestampExtractor, which has
> this code:
>
> public final Watermark getCurrentWatermark() {
>     return new Watermark(currentTimestamp == Long.MIN_VALUE ?
> Long.MIN_VALUE : currentTimestamp - 1);
> }
>
> Notice, how the watermark is "currentTimestamp - 1" where current timestamp
> is the highest seen timestamp so far and where we assume monotonically
> ascending timestamps.
>
> Cheers,
> Aljoscha
>
> On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi Jaromir,
> >
> > thank you very much for reporting this issue.
> > The behavior you are describing is not in line with the documentation of
> > watermarks [1] which clearly states that a watermark of time t tells the
> > system that no more events with a timestamp < t will occur (otherwise
> they
> > would be considered as late events). Hence, events with a timestamp = t
> as
> > in your case should be OK and not be considered late.
> >
> > I think this is not intended and probably a bug.
> >
> > I'll loop in some contributors who are more familiar with watermarks and
> > event-time (cc Aljoscha, Kostas K, Stephan).
> >
> > Best, Fabian
> >
> > [1]
> >
> > https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
> >
> > 2016-12-20 14:56 GMT+01:00 Jaromir Vanek <vanek.jaro...@gmail.com>:
> >
> > > Hi,
> > >
> > > I am using Flink 1.1.3 and following example doesn't work for me as
> > > expected.
> > >
> > > I've got three input elements with similar timestamp (equaling to
> window
> > > maxTimestamp). I'm using /event time/ notion of time with
> > > /TumblingEventTimeWindows/.
> > >
> > > I would expect all three elements to be processed in the same window,
> > > because they all have the identical event time timestamp. But the
> result
> > > I'm
> > > getting is just the first element that triggers the window. The rest of
> > > elements are considered as late-comers and discarded.
> > >
> > > From my point of view this is definitely not correct and should be
> fixed.
> > > Could you clarify if this is correct behavior or bug?
> > >
> > > I think the problem is in /WindowOperator#processWatermark/. Timer
> should
> > > be
> > > fired if and only if the current watermark is strictly larger than
> > > registered timer.
> > >
> > > /
> > > Timer<K, W> timer = watermarkTimersQueue.peek();
> > > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> > > /
> > >
> > > Thanks
> > > Jaromir Vanek
> > >
> > > /
> > > public class WindowingTest {
> > >
> > >   public static void main(String[] args) throws Exception {
> > >     StreamExecutionEnvironment env =
> > >             StreamExecutionEnvironment.createLocalEnvironment();
> > >
> > >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >
> > >     List<Tuple2&lt;Instant, Integer>> elements = Arrays.asList(
> > >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> 100),
> > >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> 200),
> > >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> 300)
> > >     );
> > >
> > >     DataStreamSource<Tuple2&lt;Instant, Integer>> input =
> > > env.fromCollection(elements);
> > >
> > >     SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>>
> timestamped =
> > >             input.assignTimestampsAndWatermarks(new
> > PunctuatedAssigner());
> > >
> > >     timestamped.timeWindowAll(Time.minutes(1))
> > >          .sum(1)
> > >          .print();
> > >
> > >     // printed result
> > >     // (2016-12-19T10:59:59.999Z,100)
> > >
> > >     env.execute();
> > >   }
> > >
> > >   private static class PunctuatedAssigner
> > >           implements AssignerWithPunctuatedWatermar
> ks<Tuple2&lt;Instant,
> > > Integer>> {
> > >
> > >     @Override
> > >     public long extractTimestamp(Tuple2<Instant, Integer> element,
> long
> > > previousElementTimestamp) {
> > >       return element.f0.toEpochMilli();
> > >     }
> > >
> > >     @Override
> > >     public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer>
> > > lastElement, long extractedTimestamp) {
> > >       return new Watermark(extractedTimestamp);
> > >     }
> > >   }
> > > }
> > > /
> > >
> > >
> > >
> > > --
> > > View this message in context: http://apache-flink-mailing-
> > > list-archive.1008284.n3.nabble.com/Flink-gives-
> > > incorrect-result-when-event-time-windowing-used-tp15058.html
> > > Sent from the Apache Flink Mailing List archive. mailing list archive
> at
> > > Nabble.com.
> > >
> >
>

Reply via email to