The order in which elements are added to internal buffers and the point in
time when FoldFunction.fold() is called don't indicate to which window
elements are added. Flink will internally keep a buffer for each window and
emit the window once the watermark passes the end of the window. In your
case, there could be several windows in-flight at one given time. So the
elements with a timestamp in [19:10:40, 19:10:49] will be added to that
window and elements with a timestamp in [19:10:50, 19:10:59] will be added
to this other window.

Looking at your log, the "100 events in this window" message indicates that
the watermark probably passed the end of the [19:10:40, 19:10:49] window
and the result for that window was emitted. The elements with timestamp
19:10:50 that you see before that in the log are added to the buffer for a
later window that will be emitted at a future time.

On Tue, 5 Jul 2016 at 04:35 Yukun Guo <gyk....@gmail.com> wrote:

> The output is the timestamps of events in string. (For convenience, the
> payload of each event is exactly the timestamp of it.) As soon as the
> folding of a time window is finished, the code will print "# events in this
> window" indicating the end of the window.
>
> The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], ...,
> but in the example above, the events at 19:10:50, which belong to
> [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49]
> one.
>
> On 4 July 2016 at 21:41, Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> Could you please elaborate a bit on what exactly the output means and how
>> you derive that events are leaking into the previous window?
>>
>> On Mon, 4 Jul 2016 at 13:20 Yukun Guo <gyk....@gmail.com> wrote:
>>
>>> Thanks for the information. Strange enough, after I set the time
>>> characteristic to EventTime, the events are leaking into the previous
>>> window:
>>>
>>> ...
>>> Mon, 04 Jul 2016 19:10:49 CST
>>> Mon, 04 Jul 2016 19:10:50 CST # ?
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> 100 events in this window
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> Mon, 04 Jul 2016 19:10:50 CST
>>> Mon, 04 Jul 2016 19:10:51 CST
>>> Mon, 04 Jul 2016 19:10:51 CST
>>>
>>>
>>> On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>
>>>> Hi,
>>>> I think it should be as simple as setting event time as the stream time
>>>> characteristic:
>>>>
>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>
>>>> The problem is that .timeWindow(Time.seconds(10)) will use processing
>>>> time if you don't specify a time characteristic. You can enforce using an
>>>> event-time window using this:
>>>>
>>>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>>
>>>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I wrote a program which constructs a WindowedStream to compute
>>>>> periodic data statistics every 10 seconds. However, I found that events
>>>>> have not been strictly grouped into windows of 10s duration, i.e., some
>>>>> events are leaking into the adjacent window.
>>>>>
>>>>> The output is like this:
>>>>>
>>>>> Mon, 04 Jul 2016 11:11:50 CST  # 1
>>>>> Mon, 04 Jul 2016 11:11:50 CST  # 2
>>>>> # removed for brevity
>>>>> Mon, 04 Jul 2016 11:11:59 CST  # 99
>>>>> 99 events in this window
>>>>> Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong
>>>>> window
>>>>> Mon, 04 Jul 2016 11:12:00 CST
>>>>>
>>>>> Here is the code:
>>>>>
>>>>> import org.apache.commons.lang3.time.FastDateFormat;
>>>>> import org.apache.flink.api.common.functions.FoldFunction;
>>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>>> import org.apache.flink.api.java.functions.KeySelector;
>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>> import 
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import 
>>>>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>>>>> import 
>>>>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>>>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>>
>>>>> public class TimeWindow {
>>>>>
>>>>>     private static class TimestampAssigner implements 
>>>>> AssignerWithPeriodicWatermarks<Long> {
>>>>>         private final long DELAY = 500;
>>>>>         private long currentWatermark;
>>>>>
>>>>>         @Override
>>>>>         public Watermark getCurrentWatermark() {
>>>>>             return new Watermark(currentWatermark);
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public long extractTimestamp(Long event, long l) {
>>>>>             currentWatermark = Math.max(currentWatermark, event - DELAY);
>>>>>             return event;
>>>>>         }
>>>>>     }
>>>>>
>>>>>     public static void main(String[] args) throws Exception {
>>>>>         final FastDateFormat formatter = FastDateFormat.getInstance("EEE, 
>>>>> dd MMM yyyy HH:mm:ss z");
>>>>>         StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>>>
>>>>>         DataStream<Long> stream = env.addSource(new 
>>>>> RichParallelSourceFunction<Long>() {
>>>>>             private volatile boolean isRunning = true;
>>>>>
>>>>>             @Override
>>>>>             public void run(SourceContext<Long> sourceContext) throws 
>>>>> Exception {
>>>>>                 while (isRunning) {
>>>>>                     sourceContext.collect(System.currentTimeMillis());
>>>>>                     Thread.sleep(200);
>>>>>                 }
>>>>>
>>>>>                 sourceContext.close();
>>>>>             }
>>>>>
>>>>>             @Override
>>>>>             public void cancel() {
>>>>>                 isRunning = false;
>>>>>             }
>>>>>         });
>>>>>
>>>>>         stream
>>>>>                 .assignTimestampsAndWatermarks(new TimestampAssigner())
>>>>>                 .keyBy(new KeySelector<Long, Integer>() {
>>>>>                     @Override
>>>>>                     public Integer getKey(Long x) throws Exception {
>>>>>                         return 0;
>>>>>                     }
>>>>>                 })
>>>>>                 .timeWindow(Time.seconds(10))
>>>>>                 .fold(0, new FoldFunction<Long, Integer>() {
>>>>>                     @Override
>>>>>                     public Integer fold(Integer count, Long x) throws 
>>>>> Exception {
>>>>>                         System.out.println(formatter.format(x));
>>>>>                         return count + 1;
>>>>>                     }
>>>>>                 })
>>>>>                 .map(new MapFunction<Integer, Void>() {
>>>>>                     @Override
>>>>>                     public Void map(Integer count) throws Exception {
>>>>>                         System.out.println(count + " events in this 
>>>>> window");
>>>>>                         return null;
>>>>>                     }
>>>>>                 });
>>>>>
>>>>>         env.execute();
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> It doesn't always happen, but if you run the program long enough it
>>>>> can be observed for sure.
>>>>> Adjusting the DELAY value of watermark generation does not change the
>>>>> behavior.
>>>>>
>>>>
>>>
>

Reply via email to