Hi Steve, I don’t have experience with the Flink CEP, but based on the previous stack trace you posted I’m guessing that for one of the records, value.getAudio_FFT1() returns null.
— Ken > On Nov 16, 2018, at 3:40 AM, Steve Bistline <srbistline.t...@gmail.com> wrote: > > I have a fairly straightforward project that is generating a null pointer and > heap space error. > > Any thoughts on where to begin debugging this? > > I suspect it is in this part of the code somewhere. > > Pattern<Event, ?> pattern = Pattern > .<Event> begin("first event").subtype(IoTEvent.class) > .where(new IterativeCondition<IoTEvent>() > { > private static final long serialVersionUID = > -6301755149429716724L; > > @Override > public boolean filter(IoTEvent value, Context<IoTEvent> ctx) > throws Exception { > return > PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); > } > }) > .next("second") > .subtype(IoTEvent.class) > .where(new IterativeCondition<IoTEvent>() { > private static final long serialVersionUID = 2392863109523984059L; > > @Override > public boolean filter(IoTEvent value, Context<IoTEvent> ctx) > throws Exception { > return > PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); > } > }) > .next("third") > .subtype(IoTEvent.class) > .where(new IterativeCondition<IoTEvent>() { > private static final long serialVersionUID = 2392863109523984059L; > > @Override > public boolean filter(IoTEvent value, Context<IoTEvent> ctx) > throws Exception { > return > PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); > } > }) > .next("fourth") > .subtype(IoTEvent.class) > .where(new IterativeCondition<IoTEvent>() { > private static final long serialVersionUID = 2392863109523984059L; > > @Override > public boolean filter(IoTEvent value, Context<IoTEvent> ctx) > throws Exception { > return > PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); > } > }) > .within(Time.seconds(10)); > > Any help appreciated > > Thanks, > > SRB > > ======================================================= > > ========================================= > > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Failure happened in filter function. > at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:704) > at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:500) > at org.apache.flink.cep.nfa.NFA.process(NFA.java:252) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at > java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > ... 7 more > Caused by: java.lang.NullPointerException > > Steve Bistline <srbistline.t...@gmail.com <mailto:srbistline.t...@gmail.com>> > Thu, Nov 15, 8:28 PM (10 hours ago) > > to user > More to the story.... the cluster kicked out this error: > > Exception: java.lang.OutOfMemoryError thrown from the > UncaughtExceptionHandler in thread "flink-7" > Uncaught error from thread [Uncaught error from thread [Uncaught error from > thread [flink-scheduler-1]: flink-akka.remote.default-remote-dispatcher-92]: > flink-10]: Java heap space, shutting down JVM since > 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[Java heap > spaceflink, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled > for for ActorSystem[Uncaught error from thread > [flink-akka.remote.default-remote-dispatcher-96]: Java heap space, shutting > down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for > ActorSystem[flink] > flink] > ] > Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is > enabled for for ActorSystem[flink] > java.lang.OutOfMemoryError: Java heap space > java.lang.OutOfMemoryError: Java heap space > java.lang.OutOfMemoryError: Java heap space > at java.net.URLClassLoader.findClass(URLClassLoader.java:362) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:411) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at > akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658) > at > akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:474) > at > akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:469) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at akka.actor.FSM$class.processEvent(FSM.scala:663) > at > akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:285) > at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657) > at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:651) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at > akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:285) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > java.lang.OutOfMemoryError: Java heap space > > -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra