The output is the timestamps of events in string. (For convenience, the payload of each event is exactly the timestamp of it.) As soon as the folding of a time window is finished, the code will print "# events in this window" indicating the end of the window.
The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ..., but in the example above, the events at 19:10:50, which belong to [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49] one. On 4 July 2016 at 21:41, Aljoscha Krettek <aljos...@apache.org> wrote: > Could you please elaborate a bit on what exactly the output means and how > you derive that events are leaking into the previous window? > > On Mon, 4 Jul 2016 at 13:20 Yukun Guo <gyk....@gmail.com> wrote: > >> Thanks for the information. Strange enough, after I set the time >> characteristic to EventTime, the events are leaking into the previous >> window: >> >> ... >> Mon, 04 Jul 2016 19:10:49 CST >> Mon, 04 Jul 2016 19:10:50 CST # ? >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> 100 events in this window >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:50 CST >> Mon, 04 Jul 2016 19:10:51 CST >> Mon, 04 Jul 2016 19:10:51 CST >> >> >> On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote: >> >>> Hi, >>> I think it should be as simple as setting event time as the stream time >>> characteristic: >>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> >>> The problem is that .timeWindow(Time.seconds(10)) will use processing >>> time if you don't specify a time characteristic. You can enforce using an >>> event-time window using this: >>> >>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10))) >>> >>> Cheers, >>> Aljoscha >>> >>> >>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I wrote a program which constructs a WindowedStream to compute periodic >>>> data statistics every 10 seconds. However, I found that events have not >>>> been strictly grouped into windows of 10s duration, i.e., some events are >>>> leaking into the adjacent window. >>>> >>>> The output is like this: >>>> >>>> Mon, 04 Jul 2016 11:11:50 CST # 1 >>>> Mon, 04 Jul 2016 11:11:50 CST # 2 >>>> # removed for brevity >>>> Mon, 04 Jul 2016 11:11:59 CST # 99 >>>> 99 events in this window >>>> Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong >>>> window >>>> Mon, 04 Jul 2016 11:12:00 CST >>>> >>>> Here is the code: >>>> >>>> import org.apache.commons.lang3.time.FastDateFormat; >>>> import org.apache.flink.api.common.functions.FoldFunction; >>>> import org.apache.flink.api.common.functions.MapFunction; >>>> import org.apache.flink.api.java.functions.KeySelector; >>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import >>>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; >>>> import >>>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; >>>> import org.apache.flink.streaming.api.watermark.Watermark; >>>> import org.apache.flink.streaming.api.windowing.time.Time; >>>> >>>> public class TimeWindow { >>>> >>>> private static class TimestampAssigner implements >>>> AssignerWithPeriodicWatermarks<Long> { >>>> private final long DELAY = 500; >>>> private long currentWatermark; >>>> >>>> @Override >>>> public Watermark getCurrentWatermark() { >>>> return new Watermark(currentWatermark); >>>> } >>>> >>>> @Override >>>> public long extractTimestamp(Long event, long l) { >>>> currentWatermark = Math.max(currentWatermark, event - DELAY); >>>> return event; >>>> } >>>> } >>>> >>>> public static void main(String[] args) throws Exception { >>>> final FastDateFormat formatter = FastDateFormat.getInstance("EEE, >>>> dd MMM yyyy HH:mm:ss z"); >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.createLocalEnvironment(); >>>> >>>> DataStream<Long> stream = env.addSource(new >>>> RichParallelSourceFunction<Long>() { >>>> private volatile boolean isRunning = true; >>>> >>>> @Override >>>> public void run(SourceContext<Long> sourceContext) throws >>>> Exception { >>>> while (isRunning) { >>>> sourceContext.collect(System.currentTimeMillis()); >>>> Thread.sleep(200); >>>> } >>>> >>>> sourceContext.close(); >>>> } >>>> >>>> @Override >>>> public void cancel() { >>>> isRunning = false; >>>> } >>>> }); >>>> >>>> stream >>>> .assignTimestampsAndWatermarks(new TimestampAssigner()) >>>> .keyBy(new KeySelector<Long, Integer>() { >>>> @Override >>>> public Integer getKey(Long x) throws Exception { >>>> return 0; >>>> } >>>> }) >>>> .timeWindow(Time.seconds(10)) >>>> .fold(0, new FoldFunction<Long, Integer>() { >>>> @Override >>>> public Integer fold(Integer count, Long x) throws >>>> Exception { >>>> System.out.println(formatter.format(x)); >>>> return count + 1; >>>> } >>>> }) >>>> .map(new MapFunction<Integer, Void>() { >>>> @Override >>>> public Void map(Integer count) throws Exception { >>>> System.out.println(count + " events in this >>>> window"); >>>> return null; >>>> } >>>> }); >>>> >>>> env.execute(); >>>> } >>>> } >>>> >>>> >>>> It doesn't always happen, but if you run the program long enough it can >>>> be observed for sure. >>>> Adjusting the DELAY value of watermark generation does not change the >>>> behavior. >>>> >>> >>