Aljoscha, thank you very much for explanation.

It seems that using /AscendingTimestampExtractor/ would really solve my
problem, because reading a watermark with "currentTimestamp - 1" is correct
way to wait for all elements with identical timestamp.

But I can see this is not true for
/BoundedOutOfOrdernessTimestampExtractor/where the watermark is used as is
without "-1".

public final Watermark getCurrentWatermark() {
  // this guarantees that the watermark never goes backwards.
  long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
  if(potentialWM >= lastEmittedWatermark) {
    lastEmittedWatermark = potentialWM;
  }
  return new Watermark(lastEmittedWatermark);
}

I think those two implementation should use the same principle.


Aljoscha Krettek-2 wrote
> I'm afraid the doc is wrong here. The JavaDoc on Watermark says this about
> watermarks:
> 
> "A Watermark tells operators that receive it that no elements with a
> timestamp older or equal to the watermark timestamp should arrive at the
> operator."
> 
> The system also relies on this fact, as visible in how timers are read
> from
> the watermark timers queue and in AscendingTimestampExtractor, which has
> this code:
> 
> public final Watermark getCurrentWatermark() {
>     return new Watermark(currentTimestamp == Long.MIN_VALUE ?
> Long.MIN_VALUE : currentTimestamp - 1);
> }
> 
> Notice, how the watermark is "currentTimestamp - 1" where current
> timestamp
> is the highest seen timestamp so far and where we assume monotonically
> ascending timestamps.
> 
> Cheers,
> Aljoscha
> 
> On Tue, 20 Dec 2016 at 15:28 Fabian Hueske <

> fhueske@

> > wrote:
> 
>> Hi Jaromir,
>>
>> thank you very much for reporting this issue.
>> The behavior you are describing is not in line with the documentation of
>> watermarks [1] which clearly states that a watermark of time t tells the
>> system that no more events with a timestamp < t will occur (otherwise
>> they
>> would be considered as late events). Hence, events with a timestamp = t
>> as
>> in your case should be OK and not be considered late.
>>
>> I think this is not intended and probably a bug.
>>
>> I'll loop in some contributors who are more familiar with watermarks and
>> event-time (cc Aljoscha, Kostas K, Stephan).
>>
>> Best, Fabian
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html#event-time-and-watermarks
>>
>> 2016-12-20 14:56 GMT+01:00 Jaromir Vanek &lt;

> vanek.jaromir@

> &gt;:
>>
>> > Hi,
>> >
>> > I am using Flink 1.1.3 and following example doesn't work for me as
>> > expected.
>> >
>> > I've got three input elements with similar timestamp (equaling to
>> window
>> > maxTimestamp). I'm using /event time/ notion of time with
>> > /TumblingEventTimeWindows/.
>> >
>> > I would expect all three elements to be processed in the same window,
>> > because they all have the identical event time timestamp. But the
>> result
>> > I'm
>> > getting is just the first element that triggers the window. The rest of
>> > elements are considered as late-comers and discarded.
>> >
>> > From my point of view this is definitely not correct and should be
>> fixed.
>> > Could you clarify if this is correct behavior or bug?
>> >
>> > I think the problem is in /WindowOperator#processWatermark/. Timer
>> should
>> > be
>> > fired if and only if the current watermark is strictly larger than
>> > registered timer.
>> >
>> > /
>> > Timer&lt;K, W&gt; timer = watermarkTimersQueue.peek();
>> > if (timer != null && timer.timestamp <= mark.getTimestamp()) {
>> > /
>> >
>> > Thanks
>> > Jaromir Vanek
>> >
>> > /
>> > public class WindowingTest {
>> >
>> >   public static void main(String[] args) throws Exception {
>> >     StreamExecutionEnvironment env =
>> >             StreamExecutionEnvironment.createLocalEnvironment();
>> >
>> >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >
>> >     List&lt;Tuple2&amp;lt;Instant, Integer&gt;> elements =
>> Arrays.asList(
>> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
>> 100),
>> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
>> 200),
>> >             new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"),
>> 300)
>> >     );
>> >
>> >     DataStreamSource&lt;Tuple2&amp;lt;Instant, Integer&gt;> input =
>> > env.fromCollection(elements);
>> >
>> >     SingleOutputStreamOperator&lt;Tuple2&amp;lt;Instant, Integer&gt;>
>> timestamped =
>> >             input.assignTimestampsAndWatermarks(new
>> PunctuatedAssigner());
>> >
>> >     timestamped.timeWindowAll(Time.minutes(1))
>> >          .sum(1)
>> >          .print();
>> >
>> >     // printed result
>> >     // (2016-12-19T10:59:59.999Z,100)
>> >
>> >     env.execute();
>> >   }
>> >
>> >   private static class PunctuatedAssigner
>> >           implements
>> AssignerWithPunctuatedWatermarks&lt;Tuple2&amp;lt;Instant,
> &gt; > Integer>> {
>> >
>> >     @Override
>> >     public long extractTimestamp(Tuple2&lt;Instant, Integer&gt;
>> element, long
>> > previousElementTimestamp) {
>> >       return element.f0.toEpochMilli();
>> >     }
>> >
>> >     @Override
>> >     public Watermark checkAndGetNextWatermark(Tuple2&lt;Instant,
>> Integer&gt;
>> > lastElement, long extractedTimestamp) {
>> >       return new Watermark(extractedTimestamp);
>> >     }
>> >   }
>> > }
>> > /
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-flink-mailing-
>> > list-archive.1008284.n3.nabble.com/Flink-gives-
>> > incorrect-result-when-event-time-windowing-used-tp15058.html
>> > Sent from the Apache Flink Mailing List archive. mailing list archive
>> at
>> > Nabble.com.
>> >
>>





--
View this message in context: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-gives-incorrect-result-when-event-time-windowing-used-tp15058p15093.html
Sent from the Apache Flink Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to