Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 Oh, I just remembered that the OVER RANGE semantics are slightly different. All rows that arrive in the same millisecond need to get the same aggregation value. So we need not only to retract all records which are too old but also accumulate all records which are received in the same millisecond. Therefore, we would need to redesign the `ProcessFunction` a bit. - In `processElement()` we put the new record in the MapState and register a processing time timer for current time + 1. This will create a call back on `onTimer()` when current time + 1 is reached. - When `onTimer()` is called, we process the rows of timestamp - 1, retract all old values accumulate all new values and emit all rows of timestamp - 1. The implementation of `onTimer()` can reuse most of what is currently done in `processElement()`. Does that make sense @rtudoran? Do you want to make the change? Otherwise, I can also do it before merging. Sorry for recognizing this just now :-/ Best, Fabian
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---