[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485183#comment-15485183 ]
ASF GitHub Bot commented on FLINK-4520: --------------------------------------- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78447224 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.StreamSchema; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class); + private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; + + private final SiddhiOperatorContext siddhiPlan; + private final String executionExpression; + private final boolean isProcessingTime; + private final Map<String, MultiplexingStreamRecordSerializer<IN>> streamRecordSerializers; + + private transient SiddhiManager siddhiManager; + private transient ExecutionPlanRuntime siddhiRuntime; + private transient Map<String, InputHandler> inputStreamHandlers; + + // queue to buffer out of order stream records + private transient PriorityQueue<StreamRecord<IN>> priorityQueue; + + /** + * @param siddhiPlan Siddhi CEP Execution Plan + */ + public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) { + validate(siddhiPlan); + this.executionExpression = siddhiPlan.getFinalExecutionPlan(); + this.siddhiPlan = siddhiPlan; + this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + this.streamRecordSerializers = new HashMap<>(); + + for (String streamId : this.siddhiPlan.getInputStreams()) { + streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig())); + } + } + + protected abstract MultiplexingStreamRecordSerializer<IN> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig); + + protected MultiplexingStreamRecordSerializer<IN> getStreamRecordSerializer(String streamId) { + if (streamRecordSerializers.containsKey(streamId)) { + return streamRecordSerializers.get(streamId); + } else { + throw new UndefinedStreamException("Stream " + streamId + " not defined"); + } + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { --- End diff -- This code seems to be similar to the code from CEP library. Can we reuse it somehow? > Integrate Siddhi as a lightweight CEP Library > --------------------------------------------- > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP > Affects Versions: 1.2.0 > Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)