[ https://issues.apache.org/jira/browse/FLINK-3216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122144#comment-15122144 ]
ASF GitHub Bot commented on FLINK-3216: --------------------------------------- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1557 [FLINK-3216] [FLINK-3217] [cep] Initial version of CEP library This PR is the first version of Flink's CEP library. The key components are the `NFA` which uses the `SharedBuffer` to efficiently maintain the state multiple non-deterministic runs. The `NFA` implementation is strongly based on this [paper](https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf). In order to define the `NFA`, the library supports the pattern API. The pattern API let's you easily construct complex patterns with type and filter conditions. The specified pattern is then compiled into a `NFA` which is responsible for detecting the patterns. See the online documentation for a full specification of the supported operations (docs/libs/cep/index.md). In order to run the `NFA`, the library adds two custom stream operators: `CEPPatternOperator` and `KeyedCEPPatternOperator`. The former is used for non-keyed input streams and the latter for keyed data streams. The selection of the right operator is transparently done vie the `CEP.from(input, pattern)` method. `CEP.from(input.pattern)` returns a `PatternStream` which contains the matched event sequences. The event sequences can be processed by specifying a `PatternSelectFunction` or a `PatternFlatSelectFunction`. Both methods receive the detected pattern as a `Map<String, T>` where `T` is the type of the input data stream. Each event is matched against a state of the pattern and the name of the state is the key of the map. An example of the API can be seen next: ``` StreamExecutionEnvironment env = ... env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Event> input = ... DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() { @Override public Integer getKey(Event value) throws Exception { return value.getId(); } }); Pattern<Event, ?> pattern = Pattern.<Event>begin("start") .next("middle").where(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("name"); } }).followedBy("end").where(new FilterFunction<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("critical"); } }).within(Time.seconds(10)); PatternStream<Event> patternStream = CEP.from(partitionedInput, pattern); DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() { @Override public Alert select(Map<String, Event> pattern) throws Exception { return new Alert(pattern.get("start"), pattern.get("end")) } }); ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink cep Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1557.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1557 ---- commit 5deda066c5f4851a426b15a05994be0796f5e6f3 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-01-14T09:04:23Z [FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition Implements NFA using the SharedBuffer Implements NFACompiler to compile a Pattern into a NFA Add CEP operator Makes NFA and SharedBuffer serializable Add serializability support to SharedBuffer and NFA Add keyed cep pattern operator commit 6459946a62985e08c50ca397447af42662ec6558 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-01-28T15:41:01Z Adds CEP documentation Adds online documentation for the CEP library ---- > Define pattern specification > ---------------------------- > > Key: FLINK-3216 > URL: https://issues.apache.org/jira/browse/FLINK-3216 > Project: Flink > Issue Type: Sub-task > Reporter: Till Rohrmann > > In order to detect event patterns we first have to define the pattern. This > issue tracks the progress of implementing a user facing API to define event > patterns. > Patterns should support the following operations > * next(): The given event has to follow directly after the preceding event > followedBy(): The given event has to follow the preceding event. There might > occur other events in-between > * every(): In a follow-by relationship a starting event can be matched with > multiple successive events. Consider the pattern a → b where → denotes the > follow-by relationship. The event sequence a, b, b can be matched as a, b or > a, (b), b where the first b is left out. The essential question is whether a > is allowed to match multiple times or only the first time. The method every > specifies exactly that. Every events in a pattern can match with multiple > successive events. This makes only sense in a follow-by relationship, though. > * followedByEvery(): Similar to followedBy just that the specified element > can be matched with multiple successive events > * or(): Alternative event which can be matched instead of the original event: > every(“e1”).where().or(“e2”).where() > * within(): Defines a time interval in which the pattern has to be completed, > otherwise an incomplete pattern can be emitted (timeout case) -- This message was sent by Atlassian JIRA (v6.3.4#6332)