[ https://issues.apache.org/jira/browse/FLINK-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420531#comment-15420531 ]
ASF GitHub Bot commented on FLINK-3703: --------------------------------------- Github user smarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2367#discussion_r74711517 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java --- @@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) { edge.getTarget(), edge.getVersion(), copy)); + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) { + cleanUp.add(edge.getTarget()); + } } } } } } } + // Remove shared buffer entries to maintain correct matching behaviour + doCleanUp(new Predicate<K, V>() { + + @Override + public boolean toRemove(SharedBufferEntry<K, V> entry) { + return cleanUp.contains(entry); + } + }); + // Remove all entries that are dependent on the current event + if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) { + doCleanUp(new Predicate<K, V>() { + + @Override + public boolean toRemove(SharedBufferEntry<K, V> entry) { + if (entry == null) { + return false; + } + return entry.getValueTime().value == value + && entry.getValueTime().timestamp == timestamp; + } + }); + } + return result; } + private void doCleanUp(Predicate<K, V> predicate) { + ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>(); + for (SharedBufferPage<K, V> page : this.pages.values()) { + for (SharedBufferEntry<K, V> entry : page.getEntries()) { + if (entry.getReferenceCounter() <= 1) { + doRecursiveCleanup(entry, predicate, toRemove); + } + } + } + + for (SharedBufferEntry<K, V> startNode: toRemove) { + release(startNode.page.getKey(), startNode.getValueTime().value, startNode.getValueTime().getTimestamp()); + remove(startNode.page.getKey(), startNode.getValueTime().value, startNode.getValueTime().getTimestamp()); + } + } + + private boolean doRecursiveCleanup(SharedBufferEntry<K, V> startNode, Predicate<K, V> cleanUp, ArrayList<SharedBufferEntry<K, V>> toRemove) { --- End diff -- Replace ArrayList by List in the arguments, unless we need it to be ArrayList explicitly. > Add sequence matching semantics to discard matched events > --------------------------------------------------------- > > Key: FLINK-3703 > URL: https://issues.apache.org/jira/browse/FLINK-3703 > Project: Flink > Issue Type: Improvement > Components: CEP > Affects Versions: 1.0.0, 1.1.0 > Reporter: Till Rohrmann > Assignee: Ivan Mushketyk > Priority: Minor > > There is no easy way to decide whether events can be part of multiple > matching sequences or not. Currently, the default is that an event can > participate in multiple matching sequences. E.g. if you have the pattern > {{Pattern.<Event>begin("a").followedBy("b")}} and the input event stream > {{Event("A"), Event("B"), Event("C")}}, then you will generate the following > matching sequences: {{Event("A"), Event("B")}}, {{Event("A"), Event("C")}} > and {{Event("B"), Event("C")}}. > It would be useful to allow the user to define where the matching algorithm > should continue after a matching sequence has been found. Possible option > values could be > * {{from first}} - continue keeping all events for future matches (that is > the current behaviour) > * {{after first}} - continue after the first element (remove first matching > event and continue with the second event) > * {{after last}} - continue after the last element (effectively discarding > all elements of the matching sequence) -- This message was sent by Atlassian JIRA (v6.3.4#6332)