GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1557

    [FLINK-3216] [FLINK-3217] [cep] Initial version of CEP library

    This PR is the first version of Flink's CEP library.
    
    The key components are the `NFA` which uses the `SharedBuffer` to 
efficiently maintain the state multiple non-deterministic runs. The `NFA` 
implementation is strongly based on this 
[paper](https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf).
    
    In order to define the `NFA`, the library supports the pattern API. The 
pattern API let's you easily construct complex patterns with type and filter 
conditions. The specified pattern is then compiled into a `NFA` which is 
responsible for detecting the patterns. See the online documentation for a full 
specification of the supported operations (docs/libs/cep/index.md).
    
    In order to run the `NFA`, the library adds two custom stream operators: 
`CEPPatternOperator` and `KeyedCEPPatternOperator`. The former is used for 
non-keyed input streams and the latter for keyed data streams. The selection of 
the right operator is transparently done vie the `CEP.from(input, pattern)` 
method.
    
    `CEP.from(input.pattern)` returns a `PatternStream` which contains the 
matched event sequences. The event sequences can be processed by specifying a 
`PatternSelectFunction` or a `PatternFlatSelectFunction`. Both methods receive 
the detected pattern as a `Map<String, T>` where `T` is the type of the input 
data stream. Each event is matched against a state of the pattern and the name 
of the state is the key of the map.
    
    An example of the API can be seen next:
    
    ```
    StreamExecutionEnvironment env = ...
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
    DataStream<Event> input = ...
    
    DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, 
Integer>() {
        @Override
        public Integer getKey(Event value) throws Exception {
                return value.getId();
        }
    });
    
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
        .next("middle").where(new FilterFunction<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                        return value.getName().equals("name");
                }
        }).followedBy("end").where(new FilterFunction<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                        return value.getName().equals("critical");
                }
        }).within(Time.seconds(10));
    
    PatternStream<Event> patternStream = CEP.from(partitionedInput, pattern);
    
    DataStream<Alert> alerts = patternStream.select(new 
PatternSelectFunction<Event, Alert>() {
        @Override
        public Alert select(Map<String, Event> pattern) throws Exception {
                return new Alert(pattern.get("start"), pattern.get("end"))
        }
    });
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink cep

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1557.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1557
    
----
commit 5deda066c5f4851a426b15a05994be0796f5e6f3
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-01-14T09:04:23Z

    [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition
    
    Implements NFA using the SharedBuffer
    
    Implements NFACompiler to compile a Pattern into a NFA
    
    Add CEP operator
    
    Makes NFA and SharedBuffer serializable
    
    Add serializability support to SharedBuffer and NFA
    
    Add keyed cep pattern operator

commit 6459946a62985e08c50ca397447af42662ec6558
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-01-28T15:41:01Z

    Adds CEP documentation
    
    Adds online documentation for the CEP library

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to