Could you please elaborate a bit on what exactly the output means and how
you derive that events are leaking into the previous window?

On Mon, 4 Jul 2016 at 13:20 Yukun Guo <gyk....@gmail.com> wrote:

> Thanks for the information. Strange enough, after I set the time
> characteristic to EventTime, the events are leaking into the previous
> window:
>
> ...
> Mon, 04 Jul 2016 19:10:49 CST
> Mon, 04 Jul 2016 19:10:50 CST # ?
> Mon, 04 Jul 2016 19:10:50 CST
> Mon, 04 Jul 2016 19:10:50 CST
> Mon, 04 Jul 2016 19:10:50 CST
> Mon, 04 Jul 2016 19:10:50 CST
> Mon, 04 Jul 2016 19:10:50 CST
> 100 events in this window
> Mon, 04 Jul 2016 19:10:50 CST
> Mon, 04 Jul 2016 19:10:50 CST
> Mon, 04 Jul 2016 19:10:50 CST
> Mon, 04 Jul 2016 19:10:50 CST
> Mon, 04 Jul 2016 19:10:51 CST
> Mon, 04 Jul 2016 19:10:51 CST
>
>
> On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> Hi,
>> I think it should be as simple as setting event time as the stream time
>> characteristic:
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> The problem is that .timeWindow(Time.seconds(10)) will use processing
>> time if you don't specify a time characteristic. You can enforce using an
>> event-time window using this:
>>
>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))
>>
>> Cheers,
>> Aljoscha
>>
>>
>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I wrote a program which constructs a WindowedStream to compute periodic
>>> data statistics every 10 seconds. However, I found that events have not
>>> been strictly grouped into windows of 10s duration, i.e., some events are
>>> leaking into the adjacent window.
>>>
>>> The output is like this:
>>>
>>> Mon, 04 Jul 2016 11:11:50 CST  # 1
>>> Mon, 04 Jul 2016 11:11:50 CST  # 2
>>> # removed for brevity
>>> Mon, 04 Jul 2016 11:11:59 CST  # 99
>>> 99 events in this window
>>> Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong
>>> window
>>> Mon, 04 Jul 2016 11:12:00 CST
>>>
>>> Here is the code:
>>>
>>> import org.apache.commons.lang3.time.FastDateFormat;
>>> import org.apache.flink.api.common.functions.FoldFunction;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.api.java.functions.KeySelector;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import 
>>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>>> import 
>>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>
>>> public class TimeWindow {
>>>
>>>     private static class TimestampAssigner implements 
>>> AssignerWithPeriodicWatermarks<Long> {
>>>         private final long DELAY = 500;
>>>         private long currentWatermark;
>>>
>>>         @Override
>>>         public Watermark getCurrentWatermark() {
>>>             return new Watermark(currentWatermark);
>>>         }
>>>
>>>         @Override
>>>         public long extractTimestamp(Long event, long l) {
>>>             currentWatermark = Math.max(currentWatermark, event - DELAY);
>>>             return event;
>>>         }
>>>     }
>>>
>>>     public static void main(String[] args) throws Exception {
>>>         final FastDateFormat formatter = FastDateFormat.getInstance("EEE, 
>>> dd MMM yyyy HH:mm:ss z");
>>>         StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>
>>>         DataStream<Long> stream = env.addSource(new 
>>> RichParallelSourceFunction<Long>() {
>>>             private volatile boolean isRunning = true;
>>>
>>>             @Override
>>>             public void run(SourceContext<Long> sourceContext) throws 
>>> Exception {
>>>                 while (isRunning) {
>>>                     sourceContext.collect(System.currentTimeMillis());
>>>                     Thread.sleep(200);
>>>                 }
>>>
>>>                 sourceContext.close();
>>>             }
>>>
>>>             @Override
>>>             public void cancel() {
>>>                 isRunning = false;
>>>             }
>>>         });
>>>
>>>         stream
>>>                 .assignTimestampsAndWatermarks(new TimestampAssigner())
>>>                 .keyBy(new KeySelector<Long, Integer>() {
>>>                     @Override
>>>                     public Integer getKey(Long x) throws Exception {
>>>                         return 0;
>>>                     }
>>>                 })
>>>                 .timeWindow(Time.seconds(10))
>>>                 .fold(0, new FoldFunction<Long, Integer>() {
>>>                     @Override
>>>                     public Integer fold(Integer count, Long x) throws 
>>> Exception {
>>>                         System.out.println(formatter.format(x));
>>>                         return count + 1;
>>>                     }
>>>                 })
>>>                 .map(new MapFunction<Integer, Void>() {
>>>                     @Override
>>>                     public Void map(Integer count) throws Exception {
>>>                         System.out.println(count + " events in this 
>>> window");
>>>                         return null;
>>>                     }
>>>                 });
>>>
>>>         env.execute();
>>>     }
>>> }
>>>
>>>
>>> It doesn't always happen, but if you run the program long enough it can
>>> be observed for sure.
>>> Adjusting the DELAY value of watermark generation does not change the
>>> behavior.
>>>
>>
>

Reply via email to