The output is the timestamps of events in string. (For convenience, the
payload of each event is exactly the timestamp of it.) As soon as the
folding of a time window is finished, the code will print "# events in this
window" indicating the end of the window.

The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ...,
but in the example above, the events at 19:10:50, which belong to
[19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49]
one.

On 4 July 2016 at 21:41, Aljoscha Krettek <aljos...@apache.org> wrote:

> Could you please elaborate a bit on what exactly the output means and how
> you derive that events are leaking into the previous window?
>
> On Mon, 4 Jul 2016 at 13:20 Yukun Guo <gyk....@gmail.com> wrote:
>
>> Thanks for the information. Strange enough, after I set the time
>> characteristic to EventTime, the events are leaking into the previous
>> window:
>>
>> ...
>> Mon, 04 Jul 2016 19:10:49 CST
>> Mon, 04 Jul 2016 19:10:50 CST # ?
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> 100 events in this window
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:50 CST
>> Mon, 04 Jul 2016 19:10:51 CST
>> Mon, 04 Jul 2016 19:10:51 CST
>>
>>
>> On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote:
>>
>>> Hi,
>>> I think it should be as simple as setting event time as the stream time
>>> characteristic:
>>>
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>
>>> The problem is that .timeWindow(Time.seconds(10)) will use processing
>>> time if you don't specify a time characteristic. You can enforce using an
>>> event-time window using this:
>>>
>>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>>
>>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I wrote a program which constructs a WindowedStream to compute periodic
>>>> data statistics every 10 seconds. However, I found that events have not
>>>> been strictly grouped into windows of 10s duration, i.e., some events are
>>>> leaking into the adjacent window.
>>>>
>>>> The output is like this:
>>>>
>>>> Mon, 04 Jul 2016 11:11:50 CST  # 1
>>>> Mon, 04 Jul 2016 11:11:50 CST  # 2
>>>> # removed for brevity
>>>> Mon, 04 Jul 2016 11:11:59 CST  # 99
>>>> 99 events in this window
>>>> Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong
>>>> window
>>>> Mon, 04 Jul 2016 11:12:00 CST
>>>>
>>>> Here is the code:
>>>>
>>>> import org.apache.commons.lang3.time.FastDateFormat;
>>>> import org.apache.flink.api.common.functions.FoldFunction;
>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>> import org.apache.flink.api.java.functions.KeySelector;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import 
>>>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>>>> import 
>>>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>
>>>> public class TimeWindow {
>>>>
>>>>     private static class TimestampAssigner implements 
>>>> AssignerWithPeriodicWatermarks<Long> {
>>>>         private final long DELAY = 500;
>>>>         private long currentWatermark;
>>>>
>>>>         @Override
>>>>         public Watermark getCurrentWatermark() {
>>>>             return new Watermark(currentWatermark);
>>>>         }
>>>>
>>>>         @Override
>>>>         public long extractTimestamp(Long event, long l) {
>>>>             currentWatermark = Math.max(currentWatermark, event - DELAY);
>>>>             return event;
>>>>         }
>>>>     }
>>>>
>>>>     public static void main(String[] args) throws Exception {
>>>>         final FastDateFormat formatter = FastDateFormat.getInstance("EEE, 
>>>> dd MMM yyyy HH:mm:ss z");
>>>>         StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>>
>>>>         DataStream<Long> stream = env.addSource(new 
>>>> RichParallelSourceFunction<Long>() {
>>>>             private volatile boolean isRunning = true;
>>>>
>>>>             @Override
>>>>             public void run(SourceContext<Long> sourceContext) throws 
>>>> Exception {
>>>>                 while (isRunning) {
>>>>                     sourceContext.collect(System.currentTimeMillis());
>>>>                     Thread.sleep(200);
>>>>                 }
>>>>
>>>>                 sourceContext.close();
>>>>             }
>>>>
>>>>             @Override
>>>>             public void cancel() {
>>>>                 isRunning = false;
>>>>             }
>>>>         });
>>>>
>>>>         stream
>>>>                 .assignTimestampsAndWatermarks(new TimestampAssigner())
>>>>                 .keyBy(new KeySelector<Long, Integer>() {
>>>>                     @Override
>>>>                     public Integer getKey(Long x) throws Exception {
>>>>                         return 0;
>>>>                     }
>>>>                 })
>>>>                 .timeWindow(Time.seconds(10))
>>>>                 .fold(0, new FoldFunction<Long, Integer>() {
>>>>                     @Override
>>>>                     public Integer fold(Integer count, Long x) throws 
>>>> Exception {
>>>>                         System.out.println(formatter.format(x));
>>>>                         return count + 1;
>>>>                     }
>>>>                 })
>>>>                 .map(new MapFunction<Integer, Void>() {
>>>>                     @Override
>>>>                     public Void map(Integer count) throws Exception {
>>>>                         System.out.println(count + " events in this 
>>>> window");
>>>>                         return null;
>>>>                     }
>>>>                 });
>>>>
>>>>         env.execute();
>>>>     }
>>>> }
>>>>
>>>>
>>>> It doesn't always happen, but if you run the program long enough it can
>>>> be observed for sure.
>>>> Adjusting the DELAY value of watermark generation does not change the
>>>> behavior.
>>>>
>>>
>>

Reply via email to