GitHub user haoch opened a pull request: https://github.com/apache/flink/pull/2487
[FLINK-4520][flink-siddhi] Integrate Siddhi as a light-weight Streaming CEP Library Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed # Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. __It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.__ # Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) # Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) # Example StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/flink FLINK-4520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2487.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2487 ---- commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0 Author: Hao Chen <hch...@ebay.com> Date: 2016-08-29T12:34:42Z Implement initial version of flink-siddhi stream operator commit d8b131d9204e9e359afae80087afe2aecab27eaf Author: Chen, Hao <h...@apache.org> Date: 2016-08-30T14:33:59Z Implement SiddhiStream API and DSL commit 56c150fadc54d06c186223e43a6fd9ac74eee837 Author: Chen, Hao <h...@apache.org> Date: 2016-08-30T18:31:18Z Reformat code following checkstyle commit 138b0833715a7d110ddb4562e83fb6f0d5338fea Author: Chen, Hao <h...@apache.org> Date: 2016-08-31T17:41:52Z Add SiddhiTypeUtils and support stream as typed tuple commit 2484c998bd4def5330012bd6797fb807939752b7 Author: Chen, Hao <h...@apache.org> Date: 2016-09-07T17:10:03Z Support siddhi stream union commit 0a12f7d3ddad1d0fa7a216524212f777d722a3f7 Author: Chen, Hao <h...@apache.org> Date: 2016-09-07T17:40:31Z Support siddhi extension and registery commit 9dc425960b212e426e50683778aa3939492669e2 Author: Chen, Hao <h...@apache.org> Date: 2016-09-07T18:02:55Z Fix import checkstyle commit e618bb2774bd0d6af77d11f65c2200cf18253d1f Author: Chen, Hao <h...@apache.org> Date: 2016-09-08T15:29:45Z Refactor registerExtension interface commit 13d76b6c16ca538ff8113352310f9568035eb92a Author: Chen, Hao <h...@apache.org> Date: 2016-09-08T17:55:41Z Decouple SiddhiCEP API to environment instance isolation level commit 26498dee5fefd6520701f6ee23441968642656a8 Author: Chen, Hao <h...@apache.org> Date: 2016-09-08T18:00:19Z Add unit test testTriggerUndefinedStreamException commit 23dabddf1f20665161861c6757c48c5ae0b0bb08 Author: Chen, Hao <h...@apache.org> Date: 2016-09-08T18:22:18Z Decouple data stream converter with streamId commit 8bbf6f3417e0c177638301c18379c8aadc3746ee Author: Chen, Hao <h...@apache.org> Date: 2016-09-09T08:16:51Z Sorting buffer queue state persistence and restore logic, support return Map<String,Object> commit fcb4a2c85abd604965f2e375d767f6dbd2711222 Author: Chen, Hao <h...@apache.org> Date: 2016-09-09T09:28:24Z Fix checkstyle error commit 466aafca86899c89b7b6e6d25f844880f9513c51 Author: Chen, Hao <h...@apache.org> Date: 2016-09-09T17:34:11Z Reuse StreamRecord object while collecting output commit f7f9289557b1da3304ea3d43f7d2441ff314a755 Author: Chen, Hao <h...@apache.org> Date: 2016-09-09T18:34:30Z Add flink-sddhi doc in package-info.java commit ef558ce668b8403505afbaad01bfafa475e60037 Author: Chen, Hao <h...@apache.org> Date: 2016-09-10T02:21:22Z Merge branch 'master' of https://github.com/apache/flink into FLINK-4520-fix commit 4cd3ea7646ecdc5c22fa85ae9953675b9448b044 Author: Chen, Hao <h...@apache.org> Date: 2016-09-10T02:26:53Z Refactor siddhi stater snapshot/restore according to new operator state interface ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---