Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5063#discussion_r152929249 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala --- @@ -116,7 +116,7 @@ abstract class RowTimeUnboundedOver( // discard late record if (timestamp > curWatermark) { // ensure every key just registers one timer - ctx.timerService.registerEventTimeTimer(curWatermark + 1) + ctx.timerService.registerEventTimeTimer(timestamp) --- End diff -- Registering multiple timers on the same timestamp (`curWatermark + 1`), means that only one timer exists that fires exactly once when a watermark is received. By registering timers on different timestamps, we have many timers that all will fire when a watermark is received. I don't think this is what we want.
---