Hi guys, I have the following code: SingleOutputStreamOperator<Event> lastUserSession = env .socketTextStream("localhost",9000,"\n") .map(new MapFunction<String, Event>() { @Override public Event map(String value)throws Exception { String[] row = value.split(","); return new Event(Long.valueOf(row[0]), row[1], Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime()); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) { @Override public long extractTimestamp(Event element) { return element.timestamp; } }) .keyBy("userId","sessionId") .window(TumblingEventTimeWindows.of(Time.seconds(60))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) .maxBy("length",false);
lastUserSession .timeWindowAll(Time.seconds(60)) .aggregate(new AverageSessionLengthAcrossAllUsers()) .print(); What i'm trying to achieve is to calculate the average session length every 10 seconds. The problem is that once the window length is 60 seconds and a computation is triggered every 10 seconds i will receive duplicate events in my average calculation method so the average will not be correct. If i move ContinuousProcessingTimeTrigger down before AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds. Any other suggestions how to workaround this? Thanks