Hi, could you please try adding this custom watermark debugger to see what's going on with the element timestamps and watermarks:
public static class WatermarkDebugger<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> { private static final long serialVersionUID = 1L; @Override public void processElement(StreamRecord<T> element) throws Exception { System.out.println("ELEMENT: " + element); output.collect(element); } @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); System.out.println("WM: " + mark); } } you can use it like this: input.transform("WatermarkDebugger", input.getType(), new WatermarkDebugger<Tuple2<String, Integer>>()); That should give us something to work with. Cheers, Aljoscha On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetz...@apache.org> wrote: I'll add Aljoscha and Kostas Kloudas to the conversation. They have the best overview over the changes to the window operator between 1.1. and 1.2. On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI < y.marzou...@mindlytix.com> wrote: I forgot to mention : the watermark extractor is the one included in Flink API. 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: Hi robert, Yes, I am using the same code, just swithcing the version in pom.xml to 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at the time of the question)). Here is the watermark assignment : .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() { @Override public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) { return tuple3.f0; } }) Best, Yassine 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>: Hi Yassine, are you sure your watermark extractor is the same between the two versions. It sounds a bit like the watermarks for the 1.2 code are not generated correctly. Regards, Robert On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI <y.marzou...@mindlytix.com > wrote: Hi all, With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing in memory and the windows results are not emitted until the whole stream is processed. Is this a temporary behaviour due to the developments in 1.2-SNAPSHOT, or a bug? I am using a code similar to the follwoing: env.setParallelism(1); DataStream<T> sessions = env .readTextFile() .flatMap() .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>()) .keyBy(1) .window(EventTimeSessionWindows.withGap(Time.minutes(5))) .apply().setParallelism(32) sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv(); sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv(); Best, Yassine