Could you please elaborate a bit on what exactly the output means and how you derive that events are leaking into the previous window?
On Mon, 4 Jul 2016 at 13:20 Yukun Guo <gyk....@gmail.com> wrote: > Thanks for the information. Strange enough, after I set the time > characteristic to EventTime, the events are leaking into the previous > window: > > ... > Mon, 04 Jul 2016 19:10:49 CST > Mon, 04 Jul 2016 19:10:50 CST # ? > Mon, 04 Jul 2016 19:10:50 CST > Mon, 04 Jul 2016 19:10:50 CST > Mon, 04 Jul 2016 19:10:50 CST > Mon, 04 Jul 2016 19:10:50 CST > Mon, 04 Jul 2016 19:10:50 CST > 100 events in this window > Mon, 04 Jul 2016 19:10:50 CST > Mon, 04 Jul 2016 19:10:50 CST > Mon, 04 Jul 2016 19:10:50 CST > Mon, 04 Jul 2016 19:10:50 CST > Mon, 04 Jul 2016 19:10:51 CST > Mon, 04 Jul 2016 19:10:51 CST > > > On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote: > >> Hi, >> I think it should be as simple as setting event time as the stream time >> characteristic: >> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> The problem is that .timeWindow(Time.seconds(10)) will use processing >> time if you don't specify a time characteristic. You can enforce using an >> event-time window using this: >> >> stream.window(EventTimeTumblingWindows.of(Time.seconds(10))) >> >> Cheers, >> Aljoscha >> >> >> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote: >> >>> Hi, >>> >>> I wrote a program which constructs a WindowedStream to compute periodic >>> data statistics every 10 seconds. However, I found that events have not >>> been strictly grouped into windows of 10s duration, i.e., some events are >>> leaking into the adjacent window. >>> >>> The output is like this: >>> >>> Mon, 04 Jul 2016 11:11:50 CST # 1 >>> Mon, 04 Jul 2016 11:11:50 CST # 2 >>> # removed for brevity >>> Mon, 04 Jul 2016 11:11:59 CST # 99 >>> 99 events in this window >>> Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong >>> window >>> Mon, 04 Jul 2016 11:12:00 CST >>> >>> Here is the code: >>> >>> import org.apache.commons.lang3.time.FastDateFormat; >>> import org.apache.flink.api.common.functions.FoldFunction; >>> import org.apache.flink.api.common.functions.MapFunction; >>> import org.apache.flink.api.java.functions.KeySelector; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import >>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; >>> import >>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; >>> import org.apache.flink.streaming.api.watermark.Watermark; >>> import org.apache.flink.streaming.api.windowing.time.Time; >>> >>> public class TimeWindow { >>> >>> private static class TimestampAssigner implements >>> AssignerWithPeriodicWatermarks<Long> { >>> private final long DELAY = 500; >>> private long currentWatermark; >>> >>> @Override >>> public Watermark getCurrentWatermark() { >>> return new Watermark(currentWatermark); >>> } >>> >>> @Override >>> public long extractTimestamp(Long event, long l) { >>> currentWatermark = Math.max(currentWatermark, event - DELAY); >>> return event; >>> } >>> } >>> >>> public static void main(String[] args) throws Exception { >>> final FastDateFormat formatter = FastDateFormat.getInstance("EEE, >>> dd MMM yyyy HH:mm:ss z"); >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.createLocalEnvironment(); >>> >>> DataStream<Long> stream = env.addSource(new >>> RichParallelSourceFunction<Long>() { >>> private volatile boolean isRunning = true; >>> >>> @Override >>> public void run(SourceContext<Long> sourceContext) throws >>> Exception { >>> while (isRunning) { >>> sourceContext.collect(System.currentTimeMillis()); >>> Thread.sleep(200); >>> } >>> >>> sourceContext.close(); >>> } >>> >>> @Override >>> public void cancel() { >>> isRunning = false; >>> } >>> }); >>> >>> stream >>> .assignTimestampsAndWatermarks(new TimestampAssigner()) >>> .keyBy(new KeySelector<Long, Integer>() { >>> @Override >>> public Integer getKey(Long x) throws Exception { >>> return 0; >>> } >>> }) >>> .timeWindow(Time.seconds(10)) >>> .fold(0, new FoldFunction<Long, Integer>() { >>> @Override >>> public Integer fold(Integer count, Long x) throws >>> Exception { >>> System.out.println(formatter.format(x)); >>> return count + 1; >>> } >>> }) >>> .map(new MapFunction<Integer, Void>() { >>> @Override >>> public Void map(Integer count) throws Exception { >>> System.out.println(count + " events in this >>> window"); >>> return null; >>> } >>> }); >>> >>> env.execute(); >>> } >>> } >>> >>> >>> It doesn't always happen, but if you run the program long enough it can >>> be observed for sure. >>> Adjusting the DELAY value of watermark generation does not change the >>> behavior. >>> >> >