GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1557
[FLINK-3216] [FLINK-3217] [cep] Initial version of CEP library This PR is the first version of Flink's CEP library. The key components are the `NFA` which uses the `SharedBuffer` to efficiently maintain the state multiple non-deterministic runs. The `NFA` implementation is strongly based on this [paper](https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf). In order to define the `NFA`, the library supports the pattern API. The pattern API let's you easily construct complex patterns with type and filter conditions. The specified pattern is then compiled into a `NFA` which is responsible for detecting the patterns. See the online documentation for a full specification of the supported operations (docs/libs/cep/index.md). In order to run the `NFA`, the library adds two custom stream operators: `CEPPatternOperator` and `KeyedCEPPatternOperator`. The former is used for non-keyed input streams and the latter for keyed data streams. The selection of the right operator is transparently done vie the `CEP.from(input, pattern)` method. `CEP.from(input.pattern)` returns a `PatternStream` which contains the matched event sequences. The event sequences can be processed by specifying a `PatternSelectFunction` or a `PatternFlatSelectFunction`. Both methods receive the detected pattern as a `Map<String, T>` where `T` is the type of the input data stream. Each event is matched against a state of the pattern and the name of the state is the key of the map. An example of the API can be seen next: ``` StreamExecutionEnvironment env = ... env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Event> input = ... DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() { @Override public Integer getKey(Event value) throws Exception { return value.getId(); } }); Pattern<Event, ?> pattern = Pattern.<Event>begin("start") .next("middle").where(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("name"); } }).followedBy("end").where(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("critical"); } }).within(Time.seconds(10)); PatternStream<Event> patternStream = CEP.from(partitionedInput, pattern); DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() { @Override public Alert select(Map<String, Event> pattern) throws Exception { return new Alert(pattern.get("start"), pattern.get("end")) } }); ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink cep Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1557.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1557 ---- commit 5deda066c5f4851a426b15a05994be0796f5e6f3 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-01-14T09:04:23Z [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition Implements NFA using the SharedBuffer Implements NFACompiler to compile a Pattern into a NFA Add CEP operator Makes NFA and SharedBuffer serializable Add serializability support to SharedBuffer and NFA Add keyed cep pattern operator commit 6459946a62985e08c50ca397447af42662ec6558 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-01-28T15:41:01Z Adds CEP documentation Adds online documentation for the CEP library ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---