Hello Kostas,

Thanks for the suggestions.

I checked and I am getting my events in the partitionedInput stream when i
am printing it but still nothing on the alert side. I checked flink UI for
backpressure and all seems to be normal (I am having at max 1000 events per
second on the kafka topic so  don't think backpressure could be a problem,
atleast I expect so)

Also, I haven't run my test with my test data as a collection but I tried
with this following example and I did get alerts as a result: 


// CEPTest using collection

List<MyEvent> inputElements = new ArrayList<>();
    inputElements.add(new MyEvent(1, 'a', 1, 1));
    inputElements.add(new MyEvent(1, 'b', 1, 2));
    inputElements.add(new MyEvent(1, 'b', 2, 2));
    inputElements.add(new MyEvent(1, 'b', 3, 5));

    Pattern<MyEvent, ?> pattern = Pattern.<MyEvent>begin("a").where(new
FilterFunction<MyEvent>() {
      private static final long serialVersionUID = 7219646616484327688L;

      @Override
      public boolean filter(MyEvent myEvent) throws Exception {
        return myEvent.getPayload() == 'a';
      }
    }).followedBy("b").where(new FilterFunction<MyEvent>() {
      private static final long serialVersionUID = 7219646616484327688L;

      @Override
      public boolean filter(MyEvent myEvent) throws Exception {
        return myEvent.getPayload() == 'b';
      }
    }).within(Time.seconds(1));//.within(Time.milliseconds(2L));

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(1000);

    DataStream<MyEvent> input =
env.fromCollection(inputElements).assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<MyEvent>() {
      private static final long serialVersionUID = -6619787346214245526L;

      @Override
      public long extractAscendingTimestamp(MyEvent myEvent) {
        return myEvent.getTimestamp();
      }
    });

    PatternStream<MyEvent> patternStream = CEP.pattern(input.keyBy(new
KeySelector<MyEvent, Long>() {
      private static final long serialVersionUID = 6928745840509494198L;

      @Override
      public Long getKey(MyEvent myEvent) throws Exception {
        return myEvent.getId();
      }
    }), pattern);


    patternStream.select(new PatternTimeoutFunction<MyEvent, String>() {
      @Override
      public String timeout(Map<String, MyEvent> map, long l) throws
Exception {
        return map.toString() +" @ "+ l;
      }

      private static final long serialVersionUID = 300759199619789416L;


    }, new PatternSelectFunction<MyEvent, String>() {

      @Override
      public String select(Map<String, MyEvent> map) throws Exception {
        return map.toString();
      }

      private static final long serialVersionUID = 732172159423132724L;
    }).print();



Also along with that now I upgraded my flink maven project to 1.4-Snapshot
and there seems to be a problem there. 

According to  this
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html> 
: 

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
OUT> {
    @Override
    public OUT select(Map<String, IN> pattern) {
        IN startEvent = pattern.get("start");
        IN endEvent = pattern.get("end");
        return new OUT(startEvent, endEvent);
    }
}

but when I am doing it it expects a list from my side for the events:

class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN,
OUT> {
  @Override
  public OUT select(Map<String, List&lt;IN>> map) throws Exception {
    return null;
  }
}

Not really sure what am I doing wrong here, any inputs would be really
helpful.

Regards,
Biplob





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13341.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to