Dawid Wysakowicz created FLINK-6290: ---------------------------------------
Summary: SharedBuffer is improperly released when multiple edges between entries Key: FLINK-6290 URL: https://issues.apache.org/jira/browse/FLINK-6290 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.3.0 Reporter: Dawid Wysakowicz Priority: Critical Fix For: 1.3.0 Below test right now fails: {code} @Test public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() { SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer()); int numberEvents = 8; Event[] events = new Event[numberEvents]; final long timestamp = 1L; for (int i = 0; i < numberEvents; i++) { events[i] = new Event(i + 1, "e" + (i + 1), i); } sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1")); sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0")); sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1")); sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0")); sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0")); sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0")); //simulate IGNORE (next event can point to events[2]) sharedBuffer.lock("branching", events[2], timestamp); sharedBuffer.release("branching", events[4], timestamp); //There should be still events[1] and events[2] in the buffer assertFalse(sharedBuffer.isEmpty()); } {code} The problem is with the {{SharedBuffer#internalRemove}} method: {{ private void internalRemove(final SharedBufferEntry<K, V> entry) { Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>(); entriesToRemove.add(entry); while (!entriesToRemove.isEmpty()) { SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop(); if (currentEntry.getReferenceCounter() == 0) { currentEntry.remove(); for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) { if (edge.getTarget() != null) { edge.getTarget().decreaseReferenceCounter(); entriesToRemove.push(edge.getTarget()); } } } } } }} When currentEntry has multiple edges to the same entry. The entry will be added twice to the entriesToRemove and it's edges will be removed twice. -- This message was sent by Atlassian JIRA (v6.3.15#6346)