Thank you for helping the issue. Those single-element-windows arrive within seconds and delay is configured with watermark as 60000 seconds.
Following are some samples after investigated. ... {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.846","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1794280288,"count":42,"processAt":"2016-08-01T11:08:05.873","startDate":"2016-07-19T21:36:00.000"} {"hashCode":-1796184288,"count":9,"processAt":"2016-08-01T11:08:05.874","startDate":"2016-07-19T21:35:00.000"} {"hashCode":-1800043744,"count":1,"processAt":"2016-08-01T11:08:05.889","startDate":"2016-07-19T21:33:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1798107744,"count":1,"processAt":"2016-08-01T11:08:05.890","startDate":"2016-07-19T21:34:00.000"} {"hashCode":-1794280288,"count":1,"processAt":"2016-08-01T11:08:05.891","startDate":"2016-07-19T21:36:00.000"} ... "processAt" was generated as follows: @Override public void apply(TimeWindow timeWindow, Iterable<JSONObject> values, Collector<JSONObject> collector) throws Exception { DateTime startTs = new DateTime(timeWindow.getStart()); JSONObject jsonObject = new JSONObject(); int sum = 0; for (Correlation value : values){ sum += 1; } DateTime current = new DateTime(); //joda time jsonObject.put("startDate", startTs.toString()); jsonObject.put("count", sum); jsonObject.put("hashCode", timeWindow.hashCode()); jsonObject.put("processAt", current.toString()); collector.collect(jsonObject); } Is there other mistake we can try to look into? Best, Hung Chang -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-with-EventTime-tp8201p8229.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.