
I wrote a program which constructs a WindowedStream to compute periodic
data statistics every 10 seconds. However, I found that events have not
been strictly grouped into windows of 10s duration, i.e., some events are
leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window
Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

    private static class TimestampAssigner implements
AssignerWithPeriodicWatermarks<Long> {
        private final long DELAY = 500;
        private long currentWatermark;

        public Watermark getCurrentWatermark() {
            return new Watermark(currentWatermark);

        public long extractTimestamp(Long event, long l) {
            currentWatermark = Math.max(currentWatermark, event - DELAY);
            return event;

    public static void main(String[] args) throws Exception {
        final FastDateFormat formatter =
FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
        StreamExecutionEnvironment env =

        DataStream<Long> stream = env.addSource(new
RichParallelSourceFunction<Long>() {
            private volatile boolean isRunning = true;

            public void run(SourceContext<Long> sourceContext) throws
Exception {
                while (isRunning) {


            public void cancel() {
                isRunning = false;

                .assignTimestampsAndWatermarks(new TimestampAssigner())
                .keyBy(new KeySelector<Long, Integer>() {
                    public Integer getKey(Long x) throws Exception {
                        return 0;
                .fold(0, new FoldFunction<Long, Integer>() {
                    public Integer fold(Integer count, Long x) throws
Exception {
                        return count + 1;
                .map(new MapFunction<Integer, Void>() {
                    public Void map(Integer count) throws Exception {
                        System.out.println(count + " events in this window");
                        return null;


It doesn't always happen, but if you run the program long enough it can be
observed for sure.
Adjusting the DELAY value of watermark generation does not change the

