Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185749320 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, long time) { @Override public void registerEventTimeTimer(N namespace, long time) { - InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer); - if (timerSet.add(timer)) { + InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace, + this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1); + Map<String, InternalTimer<K, N>> timerMap = getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey()); + InternalTimer<K, N> prev = timerMap.put(timer.buildHashKey(), timer); + if (prev == null) { eventTimeTimersQueue.add(timer); } } @Override public void deleteProcessingTimeTimer(N namespace, long time) { - InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer); - if (timerSet.remove(timer)) { + Map<String, InternalTimer<K, N>> timerMap = getProcessingTimeTimerSetForTimer((K) keyContext.getCurrentKey()); + String key = InternalTimer.buildHashKey(keyContext.getCurrentKey().toString(), namespace.toString(), time); + InternalTimer<K, N> timer = timerMap.get(key); + if (timer != null) { + timer.markDelete(this.knInternalTimeServiceManager.getStateTableVersion().intValue()); processingTimeTimersQueue.remove(timer); } + this.knInternalTimeServiceManager.getReadLock().lock(); + try { + if (this.knInternalTimeServiceManager.getSnapshotVersions().size() == 0) { + timerMap.remove(key); --- End diff -- This looks like it could take a very long time (until the timer triggers) until a timer is truly removed when the remove happened while there was a snapshot ongoing? This could potentially accumulate a lot of deleted timers.
---