Hi, I'm trying out the new CEP library but have some problems with event detection. In my case Flink detects the event pattern: A followed by B within 10 seconds. But short time after event detection when the event pattern isn't matched anymore, the program crashes with the error message:
04/06/2016 11:04:47 Job execution switched to status FAILING. java.lang.NullPointerException at org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205) at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305) at org.apache.flink.cep.nfa.NFA.process(NFA.java:142) at org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93) at org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) After that, the job execution is restarted and proceeds well until the next AbstractCEPPatternOperator failes. That's my code: Pattern<Tuple5<String, String, Double, Double, Double>, ?> FlowPattern = Pattern.<Tuple5<String, String, Double, Double, Double>>begin("start") .followedBy("FlowOver10") .where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>() {//some Filter}}) .followedBy("PressureOver10") .where(new FilterFunction<Tuple5<String,String,Double, Double, Double>>() {//some Filter}}) .within(Time.seconds(10)); PatternStream<Tuple5<String, String, Double, Double, Double>> FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern); DataStream<String> warning = FlowFirstPatternStream.select(new FlowPatternWarning()); warning.print(); private static class FlowPatternWarning implements PatternSelectFunction<Tuple5<String, String, Double, Double, Double>, String> { @Override public String select(Map<String, Tuple5<String, String, Double, Double, Double>> pat) throws Exception { Tuple5<String, String, Double, Double, Double> pressure = pat.get("PressureOver10"); Tuple5<String, String, Double, Double, Double> flow = pat.get("FlowOver10"); return " ####### Warning! FlowPattern ####### " + pressure.toString() + " - " + flow.toString(); } } How can I solve that? Hope somebody could help me. greetz Norman -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.