gt; 300)
> >> > );
> >> >
> >> > DataStreamSource<Tuple2<Instant, Integer>> input =
> >> > env.fromCollection(elements);
> >> >
> >> > SingleOutputStreamOperator<Tuple2<Instant, Integer>>
estampsAndWatermarks(new
>> PunctuatedAssigner());
>> >
>> > timestamped.timeWindowAll(Time.minutes(1))
>> > .sum(1)
>> > .print();
>> >
>> > // printed result
>> > // (2016-12-19T10:59:59.999Z,10
> SingleOutputStreamOperator>
> timestamped =
> > > input.assignTimestampsAndWatermarks(new
> > PunctuatedAssigner());
> > >
> > > timestamped.timeWindowAll(Time.minutes(1))
> > > .sum(1)
> > > .print();
; }
> >
> > private static class PunctuatedAssigner
> > implements AssignerWithPunctuatedWatermarks > Integer>> {
> >
> > @Override
> > public long extractTimestamp(Tuple2 element, long
> > previousElementTimestamp) {
> > return element.f0.toEpochMilli();
> > }
> >
> > @Override
> > public Watermark checkAndGetNextWatermark(Tuple2
> > lastElement, long extractedTimestamp) {
> > return new Watermark(extractedTimestamp);
> > }
> > }
> > }
> > /
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-mailing-
> > list-archive.1008284.n3.nabble.com/Flink-gives-
> > incorrect-result-when-event-time-windowing-used-tp15058.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>
ass PunctuatedAssigner
> implements AssignerWithPunctuatedWatermarks Integer>> {
>
> @Override
> public long extractTimestamp(Tuple2 element, long
> previousElementTimestamp) {
> return element.f0.toEpochMilli();
> }
>
> @Override
>
ousElementTimestamp) {
return element.f0.toEpochMilli();
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2
lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
}
}
/
--
View this message in context:
http://apache-flink-mailing-