Hi,

I am using Flink 1.1.3 and following example doesn't work for me as
expected.

I've got three input elements with similar timestamp (equaling to window
maxTimestamp). I'm using /event time/ notion of time with
/TumblingEventTimeWindows/.

I would expect all three elements to be processed in the same window,
because they all have the identical event time timestamp. But the result I'm
getting is just the first element that triggers the window. The rest of
elements are considered as late-comers and discarded.

>From my point of view this is definitely not correct and should be fixed.
Could you clarify if this is correct behavior or bug?

I think the problem is in /WindowOperator#processWatermark/. Timer should be
fired if and only if the current watermark is strictly larger than
registered timer.

/
Timer<K, W> timer = watermarkTimersQueue.peek();
if (timer != null && timer.timestamp <= mark.getTimestamp()) {
/

Thanks
Jaromir Vanek

/
public class WindowingTest {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.createLocalEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    List<Tuple2&lt;Instant, Integer>> elements = Arrays.asList(
            new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 100),
            new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 200),
            new Tuple2<>(Instant.parse("2016-12-19T10:59:59.999Z"), 300)
    );

    DataStreamSource<Tuple2&lt;Instant, Integer>> input =
env.fromCollection(elements);

    SingleOutputStreamOperator<Tuple2&lt;Instant, Integer>> timestamped =
            input.assignTimestampsAndWatermarks(new PunctuatedAssigner());

    timestamped.timeWindowAll(Time.minutes(1))
         .sum(1)
         .print();

    // printed result
    // (2016-12-19T10:59:59.999Z,100)

    env.execute();
  }

  private static class PunctuatedAssigner
          implements AssignerWithPunctuatedWatermarks<Tuple2&lt;Instant,
Integer>> {

    @Override
    public long extractTimestamp(Tuple2<Instant, Integer> element, long
previousElementTimestamp) {
      return element.f0.toEpochMilli();
    }

    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<Instant, Integer>
lastElement, long extractedTimestamp) {
      return new Watermark(extractedTimestamp);
    }
  }
}
/



--
View this message in context: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-gives-incorrect-result-when-event-time-windowing-used-tp15058.html
Sent from the Apache Flink Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to