Hi, You defined a tumbling window (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows>) of 60 seconds, triggered every 10 seconds. This means that each input element can be processed/averaged up to 6 times (there is no other way if you trigger each window multiple times).
I am not sure what are you trying to achieve, but please refer to the documentation about different window types (tumbling, sliding, session) maybe it will clarify things for you: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners> If you want to avoid duplicated processing, use either tumbling window with default trigger (triggering at the end of the window), or use session windows. Piotrek > On 21 Dec 2017, at 13:29, Plamen Paskov <plamen.pas...@next-stream.com> wrote: > > Hi guys, > I have the following code: > > SingleOutputStreamOperator<Event> lastUserSession = env > .socketTextStream("localhost", 9000, "\n") > .map(new MapFunction<String, Event>() { > @Override > public Event map(String value) throws Exception { > String[] row = value.split(","); > return new Event(Long.valueOf(row[0]), row[1], > Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime()); > } > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) { > @Override > public long extractTimestamp(Event element) { > return element.timestamp; > } > }) > .keyBy("userId", "sessionId") > .window(TumblingEventTimeWindows.of(Time.seconds(60))) > .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10))) > .maxBy("length", false); > > lastUserSession > .timeWindowAll(Time.seconds(60)) > .aggregate(new AverageSessionLengthAcrossAllUsers()) > .print(); > > What i'm trying to achieve is to calculate the average session length every > 10 seconds. The problem is that once the window length is 60 seconds and a > computation is triggered > every 10 seconds i will receive duplicate events in my average calculation > method so the average will not be correct. If i move > ContinuousProcessingTimeTrigger down before > AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 > seconds. > Any other suggestions how to workaround this? > > Thanks