[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15479103#comment-15479103 ]
ASF GitHub Bot commented on FLINK-4520: --------------------------------------- Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270757 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final Map<String, DataStream<?>> dataStreams; + private final Map<String, SiddhiStreamSchema<?>> dataStreamSchemas; + private final Map<String,Class<?>> extensions = new HashMap<>(); + + public Map<String, DataStream<?>> getDataStreams(){ + return this.dataStreams; + } + + public Map<String, SiddhiStreamSchema<?>> getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map<String,Class<?>> getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static <T> SiddhiStream.SingleSiddhiStream<T> define(String streamId, DataStream<T> inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId, DataStream<T> inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public <T> SiddhiStream.SingleSiddhiStream<T> from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public <T> SiddhiStream.UnionSiddhiStream<T> union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream<T>(firstStreamId,this).union(unionStreamIds); + } + + public <T> void registerStream(final String streamId, DataStream<T> dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema<T> schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return executionEnvironment; + } + + public void registerExtension(String extensionName, Class<?> extensionClass) { + if(extensions.containsKey(extensionName)){ --- End diff -- not formatted code, put space after if and before { > Integrate Siddhi as a lightweight CEP Library > --------------------------------------------- > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP > Affects Versions: 1.2.0 > Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)