Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191711210 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -199,17 +186,9 @@ public long currentWatermark() { @Override public void registerProcessingTimeTimer(N namespace, long time) { - InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - - // make sure we only put one timer per key into the queue - Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer); - if (timerSet.add(timer)) { - - InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); + InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); + if (processingTimeTimersQueue.scheduleTimer(time, (K) keyContext.getCurrentKey(), namespace)) { --- End diff -- Good point, I would suggest we do this in another PR.
---