Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5141#discussion_r159057128 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -191,22 +191,28 @@ public boolean isEmpty() { */ public boolean prune(long pruningTimestamp) { Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator(); - boolean pruned = false; + List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>(); while (iter.hasNext()) { SharedBufferPage<K, V> page = iter.next().getValue(); - if (page.prune(pruningTimestamp)) { - pruned = true; - } + page.prune(pruningTimestamp, prunedEntries); if (page.isEmpty()) { // delete page if it is empty iter.remove(); } } - return pruned; + if (!prunedEntries.isEmpty()) { + for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) { + entry.getValue().removeEdges(prunedEntries); + } + prunedEntries.clear(); --- End diff -- As prunedEntries is local now, you don't need to clear it any longer.
---