Many thanks Aljoscha! It can replay computing old instances now. The result looks absolutely correct.
When printint currentTimestamp there are values such as 1456480762777, 1456480762778...which are not -1s. So I'm a bit confused about extractTimestamp(). Can I ask why curTimeStamp = currentTimestamp and curTimeStamp = Math.max(curTimeStamp, biz.time.getMillis()) would make such difference when replaying old instances (event_time << Time.now())? From my understanding this curTimeStamp affects getCurrentWatermark() because it will make getCurrentWatermark() return much smaller value. Cheers, Hung -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5185.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.