Yes, it would seem that the bounded-out-of-orderness extractor has an
off-by-one error. We should fix it. In most practical cases these errors
should not change results by much, however (IMHO).

Cheers,
Aljoscha

On Wed, 21 Dec 2016 at 22:43 Jaromir Vanek <vanek.jaro...@gmail.com> wrote:

> Aljoscha, thank you very much for explanation.
>
> It seems that using /AscendingTimestampExtractor/ would really solve my
> problem, because reading a watermark with "currentTimestamp - 1" is correct
> way to wait for all elements with identical timestamp.
>
> But I can see this is not true for
> /BoundedOutOfOrdernessTimestampExtractor/where the watermark is used as is
> without "-1".
>
> public final Watermark getCurrentWatermark() {
>   // this guarantees that the watermark never goes backwards.
>   long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
>   if(potentialWM >= lastEmittedWatermark) {
>     lastEmittedWatermark = potentialWM;
>   }
>   return new Watermark(lastEmittedWatermark);
> }
>
> I think those two implementation should use the same principle.
>
>
> Aljoscha Krettek-2 wrote
> > I'm afraid the doc is wrong here. The JavaDoc on Watermark says this
> about
> > watermarks:
> >
> > "A Watermark tells operators that receive it that no elements with a
> > timestamp older or equal to the watermark timestamp should arrive at the
> > operator."
> >
> > The system also relies on this fact, as visible in how timers are read
> > from
> > the watermark timers queue and in AscendingTimestampExtractor, which has
> > this code:
> >
> > public final Watermark getCurrentWatermark() {
> >     return new Watermark(currentTimestamp == Long.MIN_VALUE ?
> > Long.MIN_VALUE : currentTimestamp - 1);
> > }
> >
> > Notice, how the watermark is "currentTimestamp - 1" where current
> > timestamp
> > is the highest seen timestamp so far and where we assume monotonically
> > ascending timestamps.
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 20 Dec 2016 at 15:28 Fabian Hueske &lt;
>
> > fhueske@
>
> > &gt; wrote:
> >
> >> Hi Jaromir,
> >>
> >> thank you very much for reporting this issue.
> >> The behavior you are describing is not in line with the documentation of
> >> watermarks [1] which clearly states that a watermark of time t tells the
> >> system that no more events with a timestamp < t will occur (otherwise
> >> they
> >> would be considered as late events). Hence, events with a timestamp = t
> >> as
> >> in your case should be OK and not be considered late.
> >>
> >> I think this is not intended and probably a bug.
> >>
> >> I'll loop in some contributors who are more familiar with watermarks and
> >> event-time (cc Aljoscha, Kostas K, Stephan).
> >>
> >> Best, Fabian
> >>
> >> [1]
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
> >>
> >> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek &lt;
>
> > vanek.jaromir@
>
> > &gt;:
> >>
> >> > Hi,
> >> >
> >> > I am using Flink 1.1.3 and following example doesn't work for me as
> >> > expected.
> >> >
> >> > I've got three input elements with similar timestamp (equaling to
> >> window
> >> > maxTimestamp). I'm using /event time/ notion of time with
> >> > /TumblingEventTimeWindows/.
> >> >
> >> > I would expect all three elements to be processed in the same window,
> >> > because they all have the identical event time timestamp. But the
> >> result
> >> > I'm
> >> > getting is just the first element that triggers the window. The rest
> of
> >> > elements are considered as late-comers and discarded.
> >> >
> >> > From my point of view this is definitely not correct and should be
> >> fixed.
> >> > Could you clarify if this is correct behavior or bug?
> >> >
> >> > I think the problem is in /WindowOperator#processWatermark/. Timer
> >> should
> >> > be
> >> > fired if and only if the current watermark is strictly larger than
> >> > registered timer.
> >> >
> >> > /
> >> > Timer&lt;K, W&gt; timer = watermarkTimersQueue.peek();
> >> > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
> >> > /
> >> >
> >> > Thanks
> >> > Jaromir Vanek
> >> >
> >> > /
> >> > public class WindowingTest {
> >> >
> >> >   public static void main(String[] args) throws Exception {
> >> >     StreamExecutionEnvironment env =
> >> >             StreamExecutionEnvironment.createLocalEnvironment();
> >> >
> >> >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> >
> >> >     List&lt;Tuple2&amp;lt;Instant, Integer&gt;> elements =
> >> Arrays.asList(
> >> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> >> 100),
> >> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> >> 200),
> >> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
> >> 300)
> >> >     );
> >> >
> >> >     DataStreamSource&lt;Tuple2&amp;lt;Instant, Integer&gt;> input =
> >> > env.fromCollection(elements);
> >> >
> >> >     SingleOutputStreamOperator&lt;Tuple2&amp;lt;Instant, Integer&gt;>
> >> timestamped =
> >> >             input.assignTimestampsAndWatermarks(new
> >> PunctuatedAssigner());
> >> >
> >> >     timestamped.timeWindowAll(Time.minutes(1))
> >> >          .sum(1)
> >> >          .print();
> >> >
> >> >     // printed result
> >> >     // (2016-12-19T10:59:59.999Z,100)
> >> >
> >> >     env.execute();
> >> >   }
> >> >
> >> >   private static class PunctuatedAssigner
> >> >           implements
> >> AssignerWithPunctuatedWatermarks&lt;Tuple2&amp;lt;Instant,
> > &gt; > Integer>> {
> >> >
> >> >     @Override
> >> >     public long extractTimestamp(Tuple2&lt;Instant, Integer&gt;
> >> element, long
> >> > previousElementTimestamp) {
> >> >       return element.f0.toEpochMilli();
> >> >     }
> >> >
> >> >     @Override
> >> >     public Watermark checkAndGetNextWatermark(Tuple2&lt;Instant,
> >> Integer&gt;
> >> > lastElement, long extractedTimestamp) {
> >> >       return new Watermark(extractedTimestamp);
> >> >     }
> >> >   }
> >> > }
> >> > /
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context: http://apache-flink-mailing-
> >> > list-archive.1008284.n3.nabble.com/Flink-gives-
> >> > incorrect-result-when-event-time-windowing-used-tp15058.html
> >> > Sent from the Apache Flink Mailing List archive. mailing list archive
> >> at
> >> > Nabble.com.
> >> >
> >>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-gives-incorrect-result-when-event-time-windowing-used-tp15058p15093.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>

Reply via email to