Hi Steve,

It seems the NPE caused by the property of the IoTEvent's instance. Can you
make sure the property is not null?

Thanks, vino.

Steve Bistline <srbistline.t...@gmail.com> 于2018年11月21日周三 上午2:09写道:

> Any guidance would be most appreciated.
>
> Thx
>
> Steve
> ===========================================
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark:
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.util.FlinkRuntimeException: Failure happened in filter 
> function.
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
>       at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>       at 
> java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>       ... 7 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in 
> filter function.
>       at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>       at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>       at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>       at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>       at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
>       ... 14 more
> Caused by: java.lang.NullPointerException
>       at java.lang.String.contains(String.java:2133)
>       at 
> com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
>       at 
> com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
>       at 
> org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
>       at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>       at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>       ... 19 more
>
>
>
> ==================================================
>
>
> The code
>
>
>       // Consume the data streams from AWS Kinesis stream
>         DataStream<Event> dataStream = env.addSource(new 
> FlinkKinesisConsumer<>(
>                 pt.getRequired("stream"),
>                 new EventSchema(),
>                 kinesisConsumerConfig))
>                 .name("Kinesis Stream Consumer");
>
>        //dataStream.print();
>
>         DataStream<Event> kinesisStream = dataStream
>                 .assignTimestampsAndWatermarks(new 
> TimeLagWatermarkGenerator())
>                 .map(event -> (IoTEvent) event);
>
>         // Prints the mapped records from the Kinesis stream
>
>         //kinesisStream.print();
>
>
>         Pattern<Event, ?> pattern = Pattern
>                 .<Event> begin("first event").subtype(IoTEvent.class)
>                 .where(new IterativeCondition<IoTEvent>()
>                 {
>                     //private static final long serialVersionUID = 
> -6301755149429716724L;
>
>                     @Override
>                     public boolean filter(IoTEvent value, Context<IoTEvent> 
> ctx) throws Exception {
>                         return 
> PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>                     }
>                 })
>                 .next("second")
>                 .subtype(IoTEvent.class)
>                 .where(new IterativeCondition<IoTEvent>() {
>                     //private static final long serialVersionUID = 
> 2392863109523984059L;
>
>                     @Override
>                     public boolean filter(IoTEvent value, Context<IoTEvent> 
> ctx) throws Exception {
>                         return 
> PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>                     }
>                 })
>                 .next("third")
>                 .subtype(IoTEvent.class)
>                 .where(new IterativeCondition<IoTEvent>() {
>                     private static final long serialVersionUID = 
> 2392863109523984059L;
>
>                     @Override
>                     public boolean filter(IoTEvent value, Context<IoTEvent> 
> ctx) throws Exception {
>                         return 
> PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>                    }
>                 })
>                 .next("fourth")
>                 .subtype(IoTEvent.class)
>                 .where(new IterativeCondition<IoTEvent>() {
>                     private static final long serialVersionUID = 
> 2392863109523984059L;
>
>                     @Override
>                     public boolean filter(IoTEvent value, Context<IoTEvent> 
> ctx) throws Exception {
>                         return 
> PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>                     }
>                 })
>                 .within(Time.seconds(10));
>
>
>         // Match the pattern in the input data stream
>         PatternStream<Event> patternStream = CEP.pattern(kinesisStream, 
> pattern);
>
>         // Detects MOTION pattern match and alert
>         DataStream<Alert> alerts = patternStream.select(
>                 new PatternSelectFunction<Event, Alert>() {
>                     @Override
>                     public Alert select(Map<String, List<Event>> pattern) 
> throws Exception {
>                         Alert alert = new Alert(pattern);
>                         System.out.printf("AUDIO ALERT\n");
>
>
>                         return alert;
>                     }
>
>         }).name("Audio Alert Sink");
>
>

Reply via email to