Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185749819 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, long time) { @Override public void registerEventTimeTimer(N namespace, long time) { - InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer); - if (timerSet.add(timer)) { + InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace, + this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1); + Map<String, InternalTimer<K, N>> timerMap = getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey()); + InternalTimer<K, N> prev = timerMap.put(timer.buildHashKey(), timer); + if (prev == null) { --- End diff -- What happens if we find a `prev != null` that was marked as deleted? Looks like no timer will be inserted even though it should.
---