Hi,

I think vino is right. It seems that the NullPointerException comes from
your condition. Please add handling of the situation when the string
that you are comparing is null.

Best,

Dawid


On 21/11/2018 04:32, vino yang wrote:
> Hi Steve,
>
> It seems the NPE caused by the property of the IoTEvent's instance.
> Can you make sure the property is not null?
>
> Thanks, vino.
>
> Steve Bistline <srbistline.t...@gmail.com
> <mailto:srbistline.t...@gmail.com>> 于2018年11月21日周三 上午2:09写道:
>
>     Any guidance would be most appreciated.
>
>     Thx
>
>     Steve
>     ===========================================
>
>     java.lang.RuntimeException: Exception occurred while processing valve 
> output watermark: 
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
>     Caused by: java.lang.RuntimeException: 
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter 
> function.
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
>       at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>       at 
> java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>       ... 7 more
>     Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened 
> in filter function.
>       at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>       at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>       at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>       at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
>       ... 14 more
>     Caused by: java.lang.NullPointerException
>       at java.lang.String.contains(String.java:2133)
>       at 
> com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
>       at 
> com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
>       at 
> org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
>       at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>       at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>       ... 19 more
>
>     ==================================================
>
>     The code
>
>           // Consume the data streams from AWS Kinesis stream 
> DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
>                     pt.getRequired("stream"), new EventSchema(), 
> kinesisConsumerConfig))
>                     .name("Kinesis Stream Consumer"); //dataStream.print(); 
> DataStream<Event> kinesisStream = dataStream
>                     .assignTimestampsAndWatermarks(new 
> TimeLagWatermarkGenerator())
>                     .map(event -> (IoTEvent) event); // Prints the mapped 
> records from the Kinesis stream
>     //kinesisStream.print();Pattern<Event, ?> pattern = Pattern
>                     .<Event> begin("first event").subtype(IoTEvent.class)
>                     .where(new IterativeCondition<IoTEvent>()
>                     {
>                         //private static final long serialVersionUID = 
> -6301755149429716724L; @Override
>                         public boolean filter(IoTEvent value, 
> Context<IoTEvent> ctx) throws Exception {
>                             return 
> PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); }
>                     })
>                     .next("second")
>                     .subtype(IoTEvent.class)
>                     .where(new IterativeCondition<IoTEvent>() {
>                         //private static final long serialVersionUID = 
> 2392863109523984059L; @Override
>                         public boolean filter(IoTEvent value, 
> Context<IoTEvent> ctx) throws Exception {
>                             return 
> PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); }
>                     })
>                     .next("third")
>                     .subtype(IoTEvent.class)
>                     .where(new IterativeCondition<IoTEvent>() {
>                         private static final long serialVersionUID = 
> 2392863109523984059L; @Override
>                         public boolean filter(IoTEvent value, 
> Context<IoTEvent> ctx) throws Exception {
>                             return 
> PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); }
>                     })
>                     .next("fourth")
>                     .subtype(IoTEvent.class)
>                     .where(new IterativeCondition<IoTEvent>() {
>                         private static final long serialVersionUID = 
> 2392863109523984059L; @Override
>                         public boolean filter(IoTEvent value, 
> Context<IoTEvent> ctx) throws Exception {
>                             return 
> PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); }
>                     })
>                     .within(Time.seconds(10)); // Match the pattern in the 
> input data stream PatternStream<Event> patternStream = 
> CEP.pattern(kinesisStream, pattern); // Detects MOTION pattern match and 
> alert DataStream<Alert> alerts = patternStream.select(
>                     new PatternSelectFunction<Event, Alert>() {
>                         @Override
>                         public Alert select(Map<String, List<Event>> pattern) 
> throws Exception {
>                             Alert alert = new Alert(pattern); 
> System.out.printf("AUDIO ALERT\n"); return alert; }
>
>             }).name("Audio Alert Sink");
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to