Hi,
I'm trying out the new CEP library but have some problems with event
detection.
In my case Flink detects the event pattern: A followed by B within 10
seconds.
But short time after event detection when the event pattern isn't matched
anymore, the program crashes with the error message:

04/06/2016 11:04:47     Job execution switched to status FAILING.
java.lang.NullPointerException
        at
org.apache.flink.cep.nfa.SharedBuffer.extractPatterns(SharedBuffer.java:205)
        at org.apache.flink.cep.nfa.NFA.extractPatternMatches(NFA.java:305)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:142)
        at
org.apache.flink.cep.operator.AbstractCEPPatternOperator.processEvent(AbstractCEPPatternOperator.java:93)
        at
org.apache.flink.cep.operator.CEPPatternOperator.processWatermark(CEPPatternOperator.java:88)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:158)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


After that, the job execution is restarted and proceeds well until the next
AbstractCEPPatternOperator failes.

That's my code:
Pattern<Tuple5&lt;String, String, Double, Double, Double>, ?> FlowPattern =
Pattern.<Tuple5&lt;String, String, Double, Double, Double>>begin("start")
.followedBy("FlowOver10")
.where(new FilterFunction<Tuple5&lt;String,String,Double, Double, Double>>()
{//some Filter}})
.followedBy("PressureOver10")
.where(new FilterFunction<Tuple5&lt;String,String,Double, Double, Double>>()
{//some Filter}})
.within(Time.seconds(10));

PatternStream<Tuple5&lt;String, String, Double, Double, Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new
FlowPatternWarning());
warning.print();

private static class FlowPatternWarning implements
PatternSelectFunction<Tuple5&lt;String, String, Double, Double, Double>,
String> {
                @Override
                public String select(Map<String, Tuple5&lt;String, String, 
Double, Double,
Double>> pat) throws Exception {
                      Tuple5<String, String, Double, Double, Double> pressure =
pat.get("PressureOver10");
                      Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");
                
                        return "  #######   Warning! FlowPattern   ####### " +
pressure.toString() + " - " + flow.toString();                  
                }
        }


How can I solve that?
Hope somebody could help me.

greetz Norman




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-AbstractCEPPatternOperator-fail-after-event-detection-tp5948.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to