[ 
https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15485183#comment-15485183
 ] 

ASF GitHub Bot commented on FLINK-4520:
---------------------------------------

Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2487#discussion_r78447224
  
    --- Diff: 
flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java
 ---
    @@ -0,0 +1,265 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.siddhi.operator;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
    +import org.apache.flink.contrib.siddhi.schema.StreamSchema;
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.streaming.api.TimeCharacteristic;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.Output;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import 
org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.StreamTask;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.wso2.siddhi.core.ExecutionPlanRuntime;
    +import org.wso2.siddhi.core.SiddhiManager;
    +import org.wso2.siddhi.core.stream.input.InputHandler;
    +import org.wso2.siddhi.query.api.definition.AbstractDefinition;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.PriorityQueue;
    +
    +public abstract class AbstractSiddhiOperator<IN, OUT> extends 
AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
    +   private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSiddhiOperator.class);
    +   private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    +
    +   private final SiddhiOperatorContext siddhiPlan;
    +   private final String executionExpression;
    +   private final boolean isProcessingTime;
    +   private final Map<String, MultiplexingStreamRecordSerializer<IN>> 
streamRecordSerializers;
    +
    +   private transient SiddhiManager siddhiManager;
    +   private transient ExecutionPlanRuntime siddhiRuntime;
    +   private transient Map<String, InputHandler> inputStreamHandlers;
    +
    +   // queue to buffer out of order stream records
    +   private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
    +
    +   /**
    +    * @param siddhiPlan Siddhi CEP  Execution Plan
    +    */
    +   public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
    +           validate(siddhiPlan);
    +           this.executionExpression = siddhiPlan.getFinalExecutionPlan();
    +           this.siddhiPlan = siddhiPlan;
    +           this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() 
== TimeCharacteristic.ProcessingTime;
    +           this.streamRecordSerializers = new HashMap<>();
    +
    +           for (String streamId : this.siddhiPlan.getInputStreams()) {
    +                   streamRecordSerializers.put(streamId, 
createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), 
this.siddhiPlan.getExecutionConfig()));
    +           }
    +   }
    +
    +   protected abstract MultiplexingStreamRecordSerializer<IN> 
createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig 
executionConfig);
    +
    +   protected MultiplexingStreamRecordSerializer<IN> 
getStreamRecordSerializer(String streamId) {
    +           if (streamRecordSerializers.containsKey(streamId)) {
    +                   return streamRecordSerializers.get(streamId);
    +           } else {
    +                   throw new UndefinedStreamException("Stream " + streamId 
+ " not defined");
    +           }
    +   }
    +
    +   @Override
    +   public void processElement(StreamRecord<IN> element) throws Exception {
    --- End diff --
    
    This code seems to be similar to the code from CEP library. Can we reuse it 
somehow?


> Integrate Siddhi as a lightweight CEP Library
> ---------------------------------------------
>
>                 Key: FLINK-4520
>                 URL: https://issues.apache.org/jira/browse/FLINK-4520
>             Project: Flink
>          Issue Type: New Feature
>          Components: CEP
>    Affects Versions: 1.2.0
>            Reporter: Hao Chen
>              Labels: cep, library, patch-available
>             Fix For: 1.2.0
>
>
> h1. flink-siddhi proposal
> h2. Abstraction
> Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event 
> Processing Engine (CEP) released as a Java Library under `Apache Software 
> License v2.0`. Siddhi CEP processes events which are generated by various 
> event sources, analyses them and notifies appropriate complex events 
> according to the user specified queries. 
> It would be very helpful for flink users (especially streaming application 
> developer) to provide a library to run Siddhi CEP query directly in Flink 
> streaming application.
> * http://wso2.com/products/complex-event-processor/
> * https://github.com/wso2/siddhi
> h2. Features
> * Integrate Siddhi CEP as an stream operator (i.e. 
> `TupleStreamSiddhiOperator`), supporting rich CEP features like
>     * Filter
>     * Join
>     * Aggregation
>     * Group by
>     * Having
>     * Window
>     * Conditions and Expressions
>     * Pattern processing
>     * Sequence processing
>     * Event Tables
>     ...
> * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See 
> `SiddhiCEP` and `SiddhiStream`)
>     * Register Flink DataStream associating native type information with 
> Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc.
>     * Connect with single or multiple Flink DataStreams with Siddhi CEP 
> Execution Plan
>     * Return output stream as DataStream with type intelligently inferred 
> from Siddhi Stream Schema
> * Integrate siddhi runtime state management with Flink state (See 
> `AbstractSiddhiOperator`)
> * Support siddhi plugin management to extend CEP functions. (See 
> `SiddhiCEP#registerExtension`)
> h2. Test Cases 
> * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: 
> https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java
> h2. Example
> {code}
>  StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>  SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env);
>  cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class);
>  cep.registerStream("inputStream1", input1, "id", "name", 
> "price","timestamp");
>  cep.registerStream("inputStream2", input2, "id", "name", 
> "price","timestamp");
>  DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep
>   .from("inputStream1").union("inputStream2")
>   .sql(
>     "from every s1 = inputStream1[id == 2] "
>      + " -> s2 = inputStream2[id == 3] "
>      + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as 
> name_2 , custom:plus(s1.price,s2.price) as price"
>      + "insert into outputStream"
>   )
>   .returns("outputStream");
>  env.execute();
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to