[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485176#comment-15485176 ]
ASF GitHub Bot commented on FLINK-4520: --------------------------------------- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78446790 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiStream.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext; +import org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory; +import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Siddhi CEP API Interface + */ +@PublicEvolving +public abstract class SiddhiStream { + private final SiddhiCEP environment; + + public SiddhiStream(SiddhiCEP environment) { --- End diff -- Could we name this `cepEnvironment`? > Integrate Siddhi as a lightweight CEP Library > --------------------------------------------- > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP > Affects Versions: 1.2.0 > Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)