Please refer https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html for assigning timestamps. You can do map after keyby to assign timestamps
e.g: val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) withTimestampsAndWatermarks .keyBy( _.getGroup ) .timeWindow(Time.seconds(10)) .reduce( (a, b) => a.add(b) ) .addSink(... ~Pushpendra -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.