[ 
https://issues.apache.org/jira/browse/FLINK-3216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122144#comment-15122144
 ] 

ASF GitHub Bot commented on FLINK-3216:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1557

    [FLINK-3216] [FLINK-3217] [cep] Initial version of CEP library

    This PR is the first version of Flink's CEP library.
    
    The key components are the `NFA` which uses the `SharedBuffer` to 
efficiently maintain the state multiple non-deterministic runs. The `NFA` 
implementation is strongly based on this 
[paper](https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf).
    
    In order to define the `NFA`, the library supports the pattern API. The 
pattern API let's you easily construct complex patterns with type and filter 
conditions. The specified pattern is then compiled into a `NFA` which is 
responsible for detecting the patterns. See the online documentation for a full 
specification of the supported operations (docs/libs/cep/index.md).
    
    In order to run the `NFA`, the library adds two custom stream operators: 
`CEPPatternOperator` and `KeyedCEPPatternOperator`. The former is used for 
non-keyed input streams and the latter for keyed data streams. The selection of 
the right operator is transparently done vie the `CEP.from(input, pattern)` 
method.
    
    `CEP.from(input.pattern)` returns a `PatternStream` which contains the 
matched event sequences. The event sequences can be processed by specifying a 
`PatternSelectFunction` or a `PatternFlatSelectFunction`. Both methods receive 
the detected pattern as a `Map<String, T>` where `T` is the type of the input 
data stream. Each event is matched against a state of the pattern and the name 
of the state is the key of the map.
    
    An example of the API can be seen next:
    
    ```
    StreamExecutionEnvironment env = ...
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    DataStream<Event> input = ...
    
    DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, 
Integer>() {
        @Override
        public Integer getKey(Event value) throws Exception {
                return value.getId();
        }
    });
    
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
        .next("middle").where(new FilterFunction<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                        return value.getName().equals("name");
                }
        }).followedBy("end").where(new FilterFunction<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                        return value.getName().equals("critical");
                }
        }).within(Time.seconds(10));
    
    PatternStream<Event> patternStream = CEP.from(partitionedInput, pattern);
    
    DataStream<Alert> alerts = patternStream.select(new 
PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, Event> pattern) throws Exception {
                return new Alert(pattern.get("start"), pattern.get("end"))
        }
    });
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink cep

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1557.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1557
    
----
commit 5deda066c5f4851a426b15a05994be0796f5e6f3
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-01-14T09:04:23Z

    [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
    
    Implements NFA using the SharedBuffer
    
    Implements NFACompiler to compile a Pattern into a NFA
    
    Add CEP operator
    
    Makes NFA and SharedBuffer serializable
    
    Add serializability support to SharedBuffer and NFA
    
    Add keyed cep pattern operator

commit 6459946a62985e08c50ca397447af42662ec6558
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-01-28T15:41:01Z

    Adds CEP documentation
    
    Adds online documentation for the CEP library

----


> Define pattern specification
> ----------------------------
>
>                 Key: FLINK-3216
>                 URL: https://issues.apache.org/jira/browse/FLINK-3216
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Till Rohrmann
>
> In order to detect event patterns we first have to define the pattern. This 
> issue tracks the progress of implementing a user facing API to define event 
> patterns. 
> Patterns should support the following operations
> * next(): The given event has to follow directly after the preceding event
> followedBy(): The given event has to follow the preceding event. There might 
> occur other events in-between
> * every(): In a follow-by relationship a starting event can be matched with 
> multiple successive events. Consider the pattern a → b where → denotes the 
> follow-by relationship. The event sequence a, b, b can be matched as a, b or 
> a, (b), b where the first b is left out. The essential question is whether a 
> is allowed to match multiple times or only the first time. The method every 
> specifies exactly that. Every events in a pattern can match with multiple 
> successive events. This makes only sense in a follow-by relationship, though.
> * followedByEvery(): Similar to followedBy just that the specified element 
> can be matched with multiple successive events
> * or(): Alternative event which can be matched instead of the original event: 
> every(“e1”).where().or(“e2”).where()
> * within(): Defines a time interval in which the pattern has to be completed, 
> otherwise an incomplete pattern can be emitted (timeout case)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to