Hi Steve, It seems the NPE caused by the property of the IoTEvent's instance. Can you make sure the property is not null?
Thanks, vino. Steve Bistline <srbistline.t...@gmail.com> 于2018年11月21日周三 上午2:09写道: > Any guidance would be most appreciated. > > Thx > > Steve > =========================================== > > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: > org.apache.flink.util.FlinkRuntimeException: Failure happened in filter > function. > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at > java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > ... 7 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in > filter function. > at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731) > at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541) > at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284) > at org.apache.flink.cep.nfa.NFA.process(NFA.java:220) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282) > ... 14 more > Caused by: java.lang.NullPointerException > at java.lang.String.contains(String.java:2133) > at > com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119) > at > com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114) > at > org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43) > at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742) > at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716) > ... 19 more > > > > ================================================== > > > The code > > > // Consume the data streams from AWS Kinesis stream > DataStream<Event> dataStream = env.addSource(new > FlinkKinesisConsumer<>( > pt.getRequired("stream"), > new EventSchema(), > kinesisConsumerConfig)) > .name("Kinesis Stream Consumer"); > > //dataStream.print(); > > DataStream<Event> kinesisStream = dataStream > .assignTimestampsAndWatermarks(new > TimeLagWatermarkGenerator()) > .map(event -> (IoTEvent) event); > > // Prints the mapped records from the Kinesis stream > > //kinesisStream.print(); > > > Pattern<Event, ?> pattern = Pattern > .<Event> begin("first event").subtype(IoTEvent.class) > .where(new IterativeCondition<IoTEvent>() > { > //private static final long serialVersionUID = > -6301755149429716724L; > > @Override > public boolean filter(IoTEvent value, Context<IoTEvent> > ctx) throws Exception { > return > PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); > } > }) > .next("second") > .subtype(IoTEvent.class) > .where(new IterativeCondition<IoTEvent>() { > //private static final long serialVersionUID = > 2392863109523984059L; > > @Override > public boolean filter(IoTEvent value, Context<IoTEvent> > ctx) throws Exception { > return > PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); > } > }) > .next("third") > .subtype(IoTEvent.class) > .where(new IterativeCondition<IoTEvent>() { > private static final long serialVersionUID = > 2392863109523984059L; > > @Override > public boolean filter(IoTEvent value, Context<IoTEvent> > ctx) throws Exception { > return > PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); > } > }) > .next("fourth") > .subtype(IoTEvent.class) > .where(new IterativeCondition<IoTEvent>() { > private static final long serialVersionUID = > 2392863109523984059L; > > @Override > public boolean filter(IoTEvent value, Context<IoTEvent> > ctx) throws Exception { > return > PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); > } > }) > .within(Time.seconds(10)); > > > // Match the pattern in the input data stream > PatternStream<Event> patternStream = CEP.pattern(kinesisStream, > pattern); > > // Detects MOTION pattern match and alert > DataStream<Alert> alerts = patternStream.select( > new PatternSelectFunction<Event, Alert>() { > @Override > public Alert select(Map<String, List<Event>> pattern) > throws Exception { > Alert alert = new Alert(pattern); > System.out.printf("AUDIO ALERT\n"); > > > return alert; > } > > }).name("Audio Alert Sink"); > >