Any guidance would be most appreciated.

Thx

Steve
===========================================

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
        at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException:
org.apache.flink.util.FlinkRuntimeException: Failure happened in
filter function.
        at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
        at 
java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
        at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
        at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
        at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
        ... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failure
happened in filter function.
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
        at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
        at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
        at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
        at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
        at 
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
        ... 14 more
Caused by: java.lang.NullPointerException
        at java.lang.String.contains(String.java:2133)
        at 
com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
        at 
com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
        at 
org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
        at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
        at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
        ... 19 more



==================================================


The code


      // Consume the data streams from AWS Kinesis stream
        DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
                pt.getRequired("stream"),
                new EventSchema(),
                kinesisConsumerConfig))
                .name("Kinesis Stream Consumer");

       //dataStream.print();

        DataStream<Event> kinesisStream = dataStream
                .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
                .map(event -> (IoTEvent) event);

        // Prints the mapped records from the Kinesis stream

        //kinesisStream.print();


        Pattern<Event, ?> pattern = Pattern
                .<Event> begin("first event").subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>()
                {
                    //private static final long serialVersionUID =
-6301755149429716724L;

                    @Override
                    public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
                        return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
                    }
                })
                .next("second")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    //private static final long serialVersionUID =
2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
                        return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
                    }
                })
                .next("third")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    private static final long serialVersionUID =
2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
                        return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
                   }
                })
                .next("fourth")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    private static final long serialVersionUID =
2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
                        return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
                    }
                })
                .within(Time.seconds(10));


        // Match the pattern in the input data stream
        PatternStream<Event> patternStream =
CEP.pattern(kinesisStream, pattern);

        // Detects MOTION pattern match and alert
        DataStream<Alert> alerts = patternStream.select(
                new PatternSelectFunction<Event, Alert>() {
                    @Override
                    public Alert select(Map<String, List<Event>>
pattern) throws Exception {
                        Alert alert = new Alert(pattern);
                        System.out.printf("AUDIO ALERT\n");


                        return alert;
                    }

        }).name("Audio Alert Sink");

Reply via email to