[ 
https://issues.apache.org/jira/browse/FLINK-3703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420531#comment-15420531
 ] 

ASF GitHub Bot commented on FLINK-3703:
---------------------------------------

Github user smarthi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2367#discussion_r74711517
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
    @@ -256,16 +267,75 @@ public void prune(long pruningTimestamp) {
                                                                                
edge.getTarget(),
                                                                                
edge.getVersion(),
                                                                                
copy));
    +                                                           if 
(matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
    +                                                                   
cleanUp.add(edge.getTarget());
    +                                                           }
                                                        }
                                                }
                                        }
                                }
                        }
                }
     
    +           // Remove shared buffer entries to maintain correct matching 
behaviour
    +           doCleanUp(new Predicate<K, V>() {
    +
    +                   @Override
    +                   public boolean toRemove(SharedBufferEntry<K, V> entry) {
    +                           return cleanUp.contains(entry);
    +                   }
    +           });
    +           // Remove all entries that are dependent on the current event
    +           if (matchingBehaviour == MatchingBehaviour.AFTER_LAST) {
    +                   doCleanUp(new Predicate<K, V>() {
    +
    +                           @Override
    +                           public boolean toRemove(SharedBufferEntry<K, V> 
entry) {
    +                                   if (entry == null) {
    +                                           return false;
    +                                   }
    +                                   return entry.getValueTime().value == 
value
    +                                           && 
entry.getValueTime().timestamp == timestamp;
    +                           }
    +                   });
    +           }
    +
                return result;
        }
     
    +   private void doCleanUp(Predicate<K, V> predicate) {
    +           ArrayList<SharedBufferEntry<K, V>> toRemove = new ArrayList<>();
    +           for (SharedBufferPage<K, V> page : this.pages.values()) {
    +                   for (SharedBufferEntry<K, V> entry : page.getEntries()) 
{
    +                           if (entry.getReferenceCounter() <= 1) {
    +                                   doRecursiveCleanup(entry, predicate, 
toRemove);
    +                           }
    +                   }
    +           }
    +
    +           for (SharedBufferEntry<K, V> startNode: toRemove) {
    +                   release(startNode.page.getKey(), 
startNode.getValueTime().value, startNode.getValueTime().getTimestamp());
    +                   remove(startNode.page.getKey(), 
startNode.getValueTime().value, startNode.getValueTime().getTimestamp());
    +           }
    +   }
    +
    +   private boolean doRecursiveCleanup(SharedBufferEntry<K, V> startNode, 
Predicate<K, V> cleanUp, ArrayList<SharedBufferEntry<K, V>> toRemove) {
    --- End diff --
    
    Replace ArrayList by List in the arguments, unless we need it to be 
ArrayList explicitly.


> Add sequence matching semantics to discard matched events
> ---------------------------------------------------------
>
>                 Key: FLINK-3703
>                 URL: https://issues.apache.org/jira/browse/FLINK-3703
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.0.0, 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Ivan Mushketyk
>            Priority: Minor
>
> There is no easy way to decide whether events can be part of multiple 
> matching sequences or not. Currently, the default is that an event can 
> participate in multiple matching sequences. E.g. if you have the pattern 
> {{Pattern.<Event>begin("a").followedBy("b")}} and the input event stream 
> {{Event("A"), Event("B"), Event("C")}}, then you will generate the following 
> matching sequences: {{Event("A"), Event("B")}}, {{Event("A"), Event("C")}} 
> and {{Event("B"), Event("C")}}. 
> It would be useful to allow the user to define where the matching algorithm 
> should continue after a matching sequence has been found. Possible option 
> values could be 
>  * {{from first}} - continue keeping all events for future matches (that is 
> the current behaviour) 
>  * {{after first}} -  continue after the first element (remove first matching 
> event and continue with the second event)
>  * {{after last}} - continue after the last element (effectively discarding 
> all elements of the matching sequence)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to