Hello Kostas, Thanks for the suggestions.
I checked and I am getting my events in the partitionedInput stream when i am printing it but still nothing on the alert side. I checked flink UI for backpressure and all seems to be normal (I am having at max 1000 events per second on the kafka topic so don't think backpressure could be a problem, atleast I expect so) Also, I haven't run my test with my test data as a collection but I tried with this following example and I did get alerts as a result: // CEPTest using collection List<MyEvent> inputElements = new ArrayList<>(); inputElements.add(new MyEvent(1, 'a', 1, 1)); inputElements.add(new MyEvent(1, 'b', 1, 2)); inputElements.add(new MyEvent(1, 'b', 2, 2)); inputElements.add(new MyEvent(1, 'b', 3, 5)); Pattern<MyEvent, ?> pattern = Pattern.<MyEvent>begin("a").where(new FilterFunction<MyEvent>() { private static final long serialVersionUID = 7219646616484327688L; @Override public boolean filter(MyEvent myEvent) throws Exception { return myEvent.getPayload() == 'a'; } }).followedBy("b").where(new FilterFunction<MyEvent>() { private static final long serialVersionUID = 7219646616484327688L; @Override public boolean filter(MyEvent myEvent) throws Exception { return myEvent.getPayload() == 'b'; } }).within(Time.seconds(1));//.within(Time.milliseconds(2L)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); DataStream<MyEvent> input = env.fromCollection(inputElements).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() { private static final long serialVersionUID = -6619787346214245526L; @Override public long extractAscendingTimestamp(MyEvent myEvent) { return myEvent.getTimestamp(); } }); PatternStream<MyEvent> patternStream = CEP.pattern(input.keyBy(new KeySelector<MyEvent, Long>() { private static final long serialVersionUID = 6928745840509494198L; @Override public Long getKey(MyEvent myEvent) throws Exception { return myEvent.getId(); } }), pattern); patternStream.select(new PatternTimeoutFunction<MyEvent, String>() { @Override public String timeout(Map<String, MyEvent> map, long l) throws Exception { return map.toString() +" @ "+ l; } private static final long serialVersionUID = 300759199619789416L; }, new PatternSelectFunction<MyEvent, String>() { @Override public String select(Map<String, MyEvent> map) throws Exception { return map.toString(); } private static final long serialVersionUID = 732172159423132724L; }).print(); Also along with that now I upgraded my flink maven project to 1.4-Snapshot and there seems to be a problem there. According to this <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html> : class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> { @Override public OUT select(Map<String, IN> pattern) { IN startEvent = pattern.get("start"); IN endEvent = pattern.get("end"); return new OUT(startEvent, endEvent); } } but when I am doing it it expects a list from my side for the events: class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> { @Override public OUT select(Map<String, List<IN>> map) throws Exception { return null; } } Not really sure what am I doing wrong here, any inputs would be really helpful. Regards, Biplob -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13341.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.