Hi Aljoscha, Please excuse me for the late response; I've been busy for the whole previous week. I used the custom watermark debugger (with 1.1, I changed super.processWatermark(mark) to super.output.emitWatermark(mark)), surprisingly with 1.2, only one watremark is printed at the end of the stream with the value WM: Watermark @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are printed periodically. I am using the following revision of 1.2-SNAPSHOT : https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f495 34e3a210e9.
I uploaded the dataset I'm using as an input here : https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing ,the first column corresponds to the timestamp. You can find the code below. Thanks you for your help. import com.opencsv.CSVParser; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Collector; import java.util.*; /** * Created by ymarzougui on 11/1/2016. */ public class SortedSessionsAssigner { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Tuple3<Long,String,String>> waterMarked = env.readTextFile("file:///E:\\data\\anonymized.csv") .flatMap(new RichFlatMapFunction<String, Tuple3<Long,String,String>>() { public CSVParser csvParser; @Override public void open(Configuration config) { csvParser = new CSVParser(',', '"'); } @Override public void flatMap(String in, Collector<Tuple3<Long,String,String>> clctr) throws Exception { String[] result = csvParser.parseLine(in); clctr.collect(Tuple3.of(Long.parseLong(result[0]), result[1], result[2])); } }) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long,String,String>>() { @Override public long extractAscendingTimestamp(Tuple3<Long,String,String> tuple3) { return tuple3.f0; } }); DataStream<Tuple2<TreeMap<String, Double>, Long>> sessions = waterMarked .keyBy(1) .window(EventTimeSessionWindows.withGap(Time.minutes(5))) .apply(new WindowFunction<Tuple3<Long,String,String>,Tuple2<TreeMap<String, Double>, Long>, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Long, String, String>> iterable, Collector<Tuple2<TreeMap<String, Double>, Long>> collector) throws Exception { TreeMap<String,Double> treeMap = new TreeMap<String, Double>(); Long session_count = 0L; for (Tuple3<Long, String, String> tuple3 : iterable){ treeMap.put(tuple3.f2, treeMap.getOrDefault(tuple3.f2, 0.0) + 1); session_count += 1; } collector.collect(Tuple2.of(treeMap, session_count)); } }).setParallelism(8); waterMarked.transform("WatermarkDebugger", waterMarked.getType(), new WatermarkDebugger<Tuple3<Long, String, String>>()); //sessions.writeAsCsv("file:///E:\\data\\sessions.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1); env.execute("Sorted Sessions Assigner"); } public static class WatermarkDebugger<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> { private static final long serialVersionUID = 1L; @Override public void processElement(StreamRecord<T> element) throws Exception { System.out.println("ELEMENT: " + element); output.collect(element); } @Override public void processWatermark(Watermark mark) throws Exception { // 1.2-snapshot super.processWatermark(mark); // 1.1-snapshot //super.output.emitWatermark(mark); System.out.println("WM: " + mark); } } } Best, Yassine 2016-12-06 5:57 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > could you please try adding this custom watermark debugger to see what's > going on with the element timestamps and watermarks: > > public static class WatermarkDebugger<T> > extends AbstractStreamOperator<T> implements > OneInputStreamOperator<T, T> { > private static final long serialVersionUID = 1L; > > @Override > public void processElement(StreamRecord<T> element) throws Exception { > System.out.println("ELEMENT: " + element); > output.collect(element); > } > > @Override > public void processWatermark(Watermark mark) throws Exception { > super.processWatermark(mark); > System.out.println("WM: " + mark); > } > } > > you can use it like this: > input.transform("WatermarkDebugger", input.getType(), new > WatermarkDebugger<Tuple2<String, Integer>>()); > > That should give us something to work with. > > Cheers, > Aljoscha > > On Mon, 5 Dec 2016 at 18:54 Robert Metzger <rmetz...@apache.org> wrote: > > I'll add Aljoscha and Kostas Kloudas to the conversation. They have the > best overview over the changes to the window operator between 1.1. and 1.2. > > On Mon, Dec 5, 2016 at 11:33 AM, Yassine MARZOUGUI < > y.marzou...@mindlytix.com> wrote: > > I forgot to mention : the watermark extractor is the one included in Flink > API. > > 2016-12-05 11:31 GMT+01:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > > Hi robert, > > Yes, I am using the same code, just swithcing the version in pom.xml to > 1.2-SNAPSHOT and the cluster binaries to the compiled lastest master (at > the time of the question)). Here is the watermark assignment : > > .assignTimestampsAndWatermarks(new > AscendingTimestampExtractor<Tuple3<Long,String,String>>() > { > @Override > public long extractAscendingTimestamp(Tuple3<Long,String,String> > tuple3) { > return tuple3.f0; > } > }) > > Best, > Yassine > > 2016-12-05 11:24 GMT+01:00 Robert Metzger <rmetz...@apache.org>: > > Hi Yassine, > are you sure your watermark extractor is the same between the two > versions. It sounds a bit like the watermarks for the 1.2 code are not > generated correctly. > > Regards, > Robert > > > On Sat, Dec 3, 2016 at 9:01 AM, Yassine MARZOUGUI < > y.marzou...@mindlytix.com> wrote: > > Hi all, > > With 1.1-SNAPSHOT, EventTimeSessionWindows fire as soon as the windows > boundaries are detected, but with 1.2-SNAPDHOT the state keeps increasing > in memory and the windows results are not emitted until the whole stream is > processed. Is this a temporary behaviour due to the developments in > 1.2-SNAPSHOT, or a bug? > > I am using a code similar to the follwoing: > > env.setParallelism(1); > > DataStream<T> sessions = env > .readTextFile() > .flatMap() > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<>()) > .keyBy(1) > .window(EventTimeSessionWindows.withGap(Time.minutes(5))) > .apply().setParallelism(32) > > sessions.flatMap(flatMapFunction1).setParallelism(32).writeAsCsv(); > sessions.flatMap(flatMapFunction2).setParallelism(32).writeAsCsv(); > > Best, > Yassine > > > > > >