Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5141#discussion_r159057128
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
    @@ -191,22 +191,28 @@ public boolean isEmpty() {
         */
        public boolean prune(long pruningTimestamp) {
                Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = 
pages.entrySet().iterator();
    -           boolean pruned = false;
    +           List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>();
     
                while (iter.hasNext()) {
                        SharedBufferPage<K, V> page = iter.next().getValue();
     
    -                   if (page.prune(pruningTimestamp)) {
    -                           pruned = true;
    -                   }
    +                   page.prune(pruningTimestamp, prunedEntries);
     
                        if (page.isEmpty()) {
                                // delete page if it is empty
                                iter.remove();
                        }
                }
     
    -           return pruned;
    +           if (!prunedEntries.isEmpty()) {
    +                   for (Map.Entry<K, SharedBufferPage<K, V>> entry : 
pages.entrySet()) {
    +                           entry.getValue().removeEdges(prunedEntries);
    +                   }
    +                   prunedEntries.clear();
    --- End diff --
    
    As prunedEntries is local now, you don't need to clear it any longer.


---

Reply via email to