Thanks for the information. Strange enough, after I set the time characteristic to EventTime, the events are leaking into the previous window:
... Mon, 04 Jul 2016 19:10:49 CST Mon, 04 Jul 2016 19:10:50 CST # ? Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST 100 events in this window Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:50 CST Mon, 04 Jul 2016 19:10:51 CST Mon, 04 Jul 2016 19:10:51 CST On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I think it should be as simple as setting event time as the stream time > characteristic: > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > The problem is that .timeWindow(Time.seconds(10)) will use processing time > if you don't specify a time characteristic. You can enforce using an > event-time window using this: > > stream.window(EventTimeTumblingWindows.of(Time.seconds(10))) > > Cheers, > Aljoscha > > > On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote: > >> Hi, >> >> I wrote a program which constructs a WindowedStream to compute periodic >> data statistics every 10 seconds. However, I found that events have not >> been strictly grouped into windows of 10s duration, i.e., some events are >> leaking into the adjacent window. >> >> The output is like this: >> >> Mon, 04 Jul 2016 11:11:50 CST # 1 >> Mon, 04 Jul 2016 11:11:50 CST # 2 >> # removed for brevity >> Mon, 04 Jul 2016 11:11:59 CST # 99 >> 99 events in this window >> Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong >> window >> Mon, 04 Jul 2016 11:12:00 CST >> >> Here is the code: >> >> import org.apache.commons.lang3.time.FastDateFormat; >> import org.apache.flink.api.common.functions.FoldFunction; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.api.java.functions.KeySelector; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; >> import >> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; >> import org.apache.flink.streaming.api.watermark.Watermark; >> import org.apache.flink.streaming.api.windowing.time.Time; >> >> public class TimeWindow { >> >> private static class TimestampAssigner implements >> AssignerWithPeriodicWatermarks<Long> { >> private final long DELAY = 500; >> private long currentWatermark; >> >> @Override >> public Watermark getCurrentWatermark() { >> return new Watermark(currentWatermark); >> } >> >> @Override >> public long extractTimestamp(Long event, long l) { >> currentWatermark = Math.max(currentWatermark, event - DELAY); >> return event; >> } >> } >> >> public static void main(String[] args) throws Exception { >> final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd >> MMM yyyy HH:mm:ss z"); >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.createLocalEnvironment(); >> >> DataStream<Long> stream = env.addSource(new >> RichParallelSourceFunction<Long>() { >> private volatile boolean isRunning = true; >> >> @Override >> public void run(SourceContext<Long> sourceContext) throws >> Exception { >> while (isRunning) { >> sourceContext.collect(System.currentTimeMillis()); >> Thread.sleep(200); >> } >> >> sourceContext.close(); >> } >> >> @Override >> public void cancel() { >> isRunning = false; >> } >> }); >> >> stream >> .assignTimestampsAndWatermarks(new TimestampAssigner()) >> .keyBy(new KeySelector<Long, Integer>() { >> @Override >> public Integer getKey(Long x) throws Exception { >> return 0; >> } >> }) >> .timeWindow(Time.seconds(10)) >> .fold(0, new FoldFunction<Long, Integer>() { >> @Override >> public Integer fold(Integer count, Long x) throws >> Exception { >> System.out.println(formatter.format(x)); >> return count + 1; >> } >> }) >> .map(new MapFunction<Integer, Void>() { >> @Override >> public Void map(Integer count) throws Exception { >> System.out.println(count + " events in this window"); >> return null; >> } >> }); >> >> env.execute(); >> } >> } >> >> >> It doesn't always happen, but if you run the program long enough it can >> be observed for sure. >> Adjusting the DELAY value of watermark generation does not change the >> behavior. >> >