Hi, I think it should be as simple as setting event time as the stream time characteristic:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) The problem is that .timeWindow(Time.seconds(10)) will use processing time if you don't specify a time characteristic. You can enforce using an event-time window using this: stream.window(EventTimeTumblingWindows.of(Time.seconds(10))) Cheers, Aljoscha On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote: > Hi, > > I wrote a program which constructs a WindowedStream to compute periodic > data statistics every 10 seconds. However, I found that events have not > been strictly grouped into windows of 10s duration, i.e., some events are > leaking into the adjacent window. > > The output is like this: > > Mon, 04 Jul 2016 11:11:50 CST # 1 > Mon, 04 Jul 2016 11:11:50 CST # 2 > # removed for brevity > Mon, 04 Jul 2016 11:11:59 CST # 99 > 99 events in this window > Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong > window > Mon, 04 Jul 2016 11:12:00 CST > > Here is the code: > > import org.apache.commons.lang3.time.FastDateFormat; > import org.apache.flink.api.common.functions.FoldFunction; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.java.functions.KeySelector; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; > import > org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; > import org.apache.flink.streaming.api.watermark.Watermark; > import org.apache.flink.streaming.api.windowing.time.Time; > > public class TimeWindow { > > private static class TimestampAssigner implements > AssignerWithPeriodicWatermarks<Long> { > private final long DELAY = 500; > private long currentWatermark; > > @Override > public Watermark getCurrentWatermark() { > return new Watermark(currentWatermark); > } > > @Override > public long extractTimestamp(Long event, long l) { > currentWatermark = Math.max(currentWatermark, event - DELAY); > return event; > } > } > > public static void main(String[] args) throws Exception { > final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd > MMM yyyy HH:mm:ss z"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > > DataStream<Long> stream = env.addSource(new > RichParallelSourceFunction<Long>() { > private volatile boolean isRunning = true; > > @Override > public void run(SourceContext<Long> sourceContext) throws > Exception { > while (isRunning) { > sourceContext.collect(System.currentTimeMillis()); > Thread.sleep(200); > } > > sourceContext.close(); > } > > @Override > public void cancel() { > isRunning = false; > } > }); > > stream > .assignTimestampsAndWatermarks(new TimestampAssigner()) > .keyBy(new KeySelector<Long, Integer>() { > @Override > public Integer getKey(Long x) throws Exception { > return 0; > } > }) > .timeWindow(Time.seconds(10)) > .fold(0, new FoldFunction<Long, Integer>() { > @Override > public Integer fold(Integer count, Long x) throws > Exception { > System.out.println(formatter.format(x)); > return count + 1; > } > }) > .map(new MapFunction<Integer, Void>() { > @Override > public Void map(Integer count) throws Exception { > System.out.println(count + " events in this window"); > return null; > } > }); > > env.execute(); > } > } > > > It doesn't always happen, but if you run the program long enough it can be > observed for sure. > Adjusting the DELAY value of watermark generation does not change the > behavior. >