Thanks for the information. Strange enough, after I set the time
characteristic to EventTime, the events are leaking into the previous
window:

...
Mon, 04 Jul 2016 19:10:49 CST
Mon, 04 Jul 2016 19:10:50 CST # ?
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
100 events in this window
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:50 CST
Mon, 04 Jul 2016 19:10:51 CST
Mon, 04 Jul 2016 19:10:51 CST


On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> I think it should be as simple as setting event time as the stream time
> characteristic:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> The problem is that .timeWindow(Time.seconds(10)) will use processing time
> if you don't specify a time characteristic. You can enforce using an
> event-time window using this:
>
> stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))
>
> Cheers,
> Aljoscha
>
>
> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote:
>
>> Hi,
>>
>> I wrote a program which constructs a WindowedStream to compute periodic
>> data statistics every 10 seconds. However, I found that events have not
>> been strictly grouped into windows of 10s duration, i.e., some events are
>> leaking into the adjacent window.
>>
>> The output is like this:
>>
>> Mon, 04 Jul 2016 11:11:50 CST  # 1
>> Mon, 04 Jul 2016 11:11:50 CST  # 2
>> # removed for brevity
>> Mon, 04 Jul 2016 11:11:59 CST  # 99
>> 99 events in this window
>> Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong
>> window
>> Mon, 04 Jul 2016 11:12:00 CST
>>
>> Here is the code:
>>
>> import org.apache.commons.lang3.time.FastDateFormat;
>> import org.apache.flink.api.common.functions.FoldFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.java.functions.KeySelector;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>> import 
>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>> import org.apache.flink.streaming.api.watermark.Watermark;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>>
>> public class TimeWindow {
>>
>>     private static class TimestampAssigner implements 
>> AssignerWithPeriodicWatermarks<Long> {
>>         private final long DELAY = 500;
>>         private long currentWatermark;
>>
>>         @Override
>>         public Watermark getCurrentWatermark() {
>>             return new Watermark(currentWatermark);
>>         }
>>
>>         @Override
>>         public long extractTimestamp(Long event, long l) {
>>             currentWatermark = Math.max(currentWatermark, event - DELAY);
>>             return event;
>>         }
>>     }
>>
>>     public static void main(String[] args) throws Exception {
>>         final FastDateFormat formatter = FastDateFormat.getInstance("EEE, dd 
>> MMM yyyy HH:mm:ss z");
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createLocalEnvironment();
>>
>>         DataStream<Long> stream = env.addSource(new 
>> RichParallelSourceFunction<Long>() {
>>             private volatile boolean isRunning = true;
>>
>>             @Override
>>             public void run(SourceContext<Long> sourceContext) throws 
>> Exception {
>>                 while (isRunning) {
>>                     sourceContext.collect(System.currentTimeMillis());
>>                     Thread.sleep(200);
>>                 }
>>
>>                 sourceContext.close();
>>             }
>>
>>             @Override
>>             public void cancel() {
>>                 isRunning = false;
>>             }
>>         });
>>
>>         stream
>>                 .assignTimestampsAndWatermarks(new TimestampAssigner())
>>                 .keyBy(new KeySelector<Long, Integer>() {
>>                     @Override
>>                     public Integer getKey(Long x) throws Exception {
>>                         return 0;
>>                     }
>>                 })
>>                 .timeWindow(Time.seconds(10))
>>                 .fold(0, new FoldFunction<Long, Integer>() {
>>                     @Override
>>                     public Integer fold(Integer count, Long x) throws 
>> Exception {
>>                         System.out.println(formatter.format(x));
>>                         return count + 1;
>>                     }
>>                 })
>>                 .map(new MapFunction<Integer, Void>() {
>>                     @Override
>>                     public Void map(Integer count) throws Exception {
>>                         System.out.println(count + " events in this window");
>>                         return null;
>>                     }
>>                 });
>>
>>         env.execute();
>>     }
>> }
>>
>>
>> It doesn't always happen, but if you run the program long enough it can
>> be observed for sure.
>> Adjusting the DELAY value of watermark generation does not change the
>> behavior.
>>
>

Reply via email to