Hi, I wrote a program which constructs a WindowedStream to compute periodic data statistics every 10 seconds. However, I found that events have not been strictly grouped into windows of 10s duration, i.e., some events are leaking into the adjacent window.
The output is like this: Mon, 04 Jul 2016 11:11:50 CST # 1 Mon, 04 Jul 2016 11:11:50 CST # 2 # removed for brevity Mon, 04 Jul 2016 11:11:59 CST # 99 99 events in this window Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong window Mon, 04 Jul 2016 11:12:00 CST Here is the code: import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; public class TimeWindow { private static class TimestampAssigner implements AssignerWithPeriodicWatermarks<Long> { private final long DELAY = 500; private long currentWatermark; @Override public Watermark getCurrentWatermark() { return new Watermark(currentWatermark); } @Override public long extractTimestamp(Long event, long l) { currentWatermark = Math.max(currentWatermark, event - DELAY); return event; } } public static void main(String[] args) throws Exception { final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z"); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream<Long> stream = env.addSource(new RichParallelSourceFunction<Long>() { private volatile boolean isRunning = true; @Override public void run(SourceContext<Long> sourceContext) throws Exception { while (isRunning) { sourceContext.collect(System.currentTimeMillis()); Thread.sleep(200); } sourceContext.close(); } @Override public void cancel() { isRunning = false; } }); stream .assignTimestampsAndWatermarks(new TimestampAssigner()) .keyBy(new KeySelector<Long, Integer>() { @Override public Integer getKey(Long x) throws Exception { return 0; } }) .timeWindow(Time.seconds(10)) .fold(0, new FoldFunction<Long, Integer>() { @Override public Integer fold(Integer count, Long x) throws Exception { System.out.println(formatter.format(x)); return count + 1; } }) .map(new MapFunction<Integer, Void>() { @Override public Void map(Integer count) throws Exception { System.out.println(count + " events in this window"); return null; } }); env.execute(); } } It doesn't always happen, but if you run the program long enough it can be observed for sure. Adjusting the DELAY value of watermark generation does not change the behavior.