Any guidance would be most appreciated. Thx
Steve =========================================== java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function. at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function. at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731) at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541) at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284) at org.apache.flink.cep.nfa.NFA.process(NFA.java:220) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282) ... 14 more Caused by: java.lang.NullPointerException at java.lang.String.contains(String.java:2133) at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119) at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114) at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43) at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742) at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716) ... 19 more ================================================== The code // Consume the data streams from AWS Kinesis stream DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>( pt.getRequired("stream"), new EventSchema(), kinesisConsumerConfig)) .name("Kinesis Stream Consumer"); //dataStream.print(); DataStream<Event> kinesisStream = dataStream .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator()) .map(event -> (IoTEvent) event); // Prints the mapped records from the Kinesis stream //kinesisStream.print(); Pattern<Event, ?> pattern = Pattern .<Event> begin("first event").subtype(IoTEvent.class) .where(new IterativeCondition<IoTEvent>() { //private static final long serialVersionUID = -6301755149429716724L; @Override public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception { return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); } }) .next("second") .subtype(IoTEvent.class) .where(new IterativeCondition<IoTEvent>() { //private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception { return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); } }) .next("third") .subtype(IoTEvent.class) .where(new IterativeCondition<IoTEvent>() { private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception { return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); } }) .next("fourth") .subtype(IoTEvent.class) .where(new IterativeCondition<IoTEvent>() { private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception { return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); } }) .within(Time.seconds(10)); // Match the pattern in the input data stream PatternStream<Event> patternStream = CEP.pattern(kinesisStream, pattern); // Detects MOTION pattern match and alert DataStream<Alert> alerts = patternStream.select( new PatternSelectFunction<Event, Alert>() { @Override public Alert select(Map<String, List<Event>> pattern) throws Exception { Alert alert = new Alert(pattern); System.out.printf("AUDIO ALERT\n"); return alert; } }).name("Audio Alert Sink");