Hi,

I wrote a program which constructs a WindowedStream to compute periodic
data statistics every 10 seconds. However, I found that events have not
been strictly grouped into windows of 10s duration, i.e., some events are
leaking into the adjacent window.

The output is like this:

Mon, 04 Jul 2016 11:11:50 CST  # 1
Mon, 04 Jul 2016 11:11:50 CST  # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST  # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong window
Mon, 04 Jul 2016 11:12:00 CST

Here is the code:

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

public class TimeWindow {

    private static class TimestampAssigner implements
AssignerWithPeriodicWatermarks<Long> {
        private final long DELAY = 500;
        private long currentWatermark;

        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentWatermark);
        }

        @Override
        public long extractTimestamp(Long event, long l) {
            currentWatermark = Math.max(currentWatermark, event - DELAY);
            return event;
        }
    }

    public static void main(String[] args) throws Exception {
        final FastDateFormat formatter =
FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();

        DataStream<Long> stream = env.addSource(new
RichParallelSourceFunction<Long>() {
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<Long> sourceContext) throws
Exception {
                while (isRunning) {
                    sourceContext.collect(System.currentTimeMillis());
                    Thread.sleep(200);
                }

                sourceContext.close();
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        stream
                .assignTimestampsAndWatermarks(new TimestampAssigner())
                .keyBy(new KeySelector<Long, Integer>() {
                    @Override
                    public Integer getKey(Long x) throws Exception {
                        return 0;
                    }
                })
                .timeWindow(Time.seconds(10))
                .fold(0, new FoldFunction<Long, Integer>() {
                    @Override
                    public Integer fold(Integer count, Long x) throws
Exception {
                        System.out.println(formatter.format(x));
                        return count + 1;
                    }
                })
                .map(new MapFunction<Integer, Void>() {
                    @Override
                    public Void map(Integer count) throws Exception {
                        System.out.println(count + " events in this window");
                        return null;
                    }
                });

        env.execute();
    }
}


It doesn't always happen, but if you run the program long enough it can be
observed for sure.
Adjusting the DELAY value of watermark generation does not change the
behavior.

Reply via email to