Hi,
I think it should be as simple as setting event time as the stream time
characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

The problem is that .timeWindow(Time.seconds(10)) will use processing time
if you don't specify a time characteristic. You can enforce using an
event-time window using this:

stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))

Cheers,
Aljoscha


On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote:

> Hi,
>
> I wrote a program which constructs a WindowedStream to compute periodic
> data statistics every 10 seconds. However, I found that events have not
> been strictly grouped into windows of 10s duration, i.e., some events are
> leaking into the adjacent window.
>
> The output is like this:
>
> Mon, 04 Jul 2016 11:11:50 CST  # 1
> Mon, 04 Jul 2016 11:11:50 CST  # 2
> # removed for brevity
> Mon, 04 Jul 2016 11:11:59 CST  # 99
> 99 events in this window
> Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong
> window
> Mon, 04 Jul 2016 11:12:00 CST
>
> Here is the code:
>
> import org.apache.commons.lang3.time.FastDateFormat;
> import org.apache.flink.api.common.functions.FoldFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import 
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
> import 
> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.streaming.api.windowing.time.Time;
>
> public class TimeWindow {
>
>     private static class TimestampAssigner implements 
> AssignerWithPeriodicWatermarks<Long> {
>         private final long DELAY = 500;
>         private long currentWatermark;
>
>         @Override
>         public Watermark getCurrentWatermark() {
>             return new Watermark(currentWatermark);
>         }
>
>         @Override
>         public long extractTimestamp(Long event, long l) {
>             currentWatermark = Math.max(currentWatermark, event - DELAY);
>             return event;
>         }
>     }
>
>     public static void main(String[] args) throws Exception {
>         final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd 
> MMM yyyy HH:mm:ss z");
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createLocalEnvironment();
>
>         DataStream<Long> stream = env.addSource(new 
> RichParallelSourceFunction<Long>() {
>             private volatile boolean isRunning = true;
>
>             @Override
>             public void run(SourceContext<Long> sourceContext) throws 
> Exception {
>                 while (isRunning) {
>                     sourceContext.collect(System.currentTimeMillis());
>                     Thread.sleep(200);
>                 }
>
>                 sourceContext.close();
>             }
>
>             @Override
>             public void cancel() {
>                 isRunning = false;
>             }
>         });
>
>         stream
>                 .assignTimestampsAndWatermarks(new TimestampAssigner())
>                 .keyBy(new KeySelector<Long, Integer>() {
>                     @Override
>                     public Integer getKey(Long x) throws Exception {
>                         return 0;
>                     }
>                 })
>                 .timeWindow(Time.seconds(10))
>                 .fold(0, new FoldFunction<Long, Integer>() {
>                     @Override
>                     public Integer fold(Integer count, Long x) throws 
> Exception {
>                         System.out.println(formatter.format(x));
>                         return count + 1;
>                     }
>                 })
>                 .map(new MapFunction<Integer, Void>() {
>                     @Override
>                     public Void map(Integer count) throws Exception {
>                         System.out.println(count + " events in this window");
>                         return null;
>                     }
>                 });
>
>         env.execute();
>     }
> }
>
>
> It doesn't always happen, but if you run the program long enough it can be
> observed for sure.
> Adjusting the DELAY value of watermark generation does not change the
> behavior.
>

Reply via email to