Hi Austin, Could you elaborate a bit more what do you mean by "after a checkpoint fails", what is the reason why checkpoint fails? Would it be possible for you to prepare some reproducible example for that problem? Finally, I would also recommend trying out Flink 1.6.x, as we reworked the underlying structure for CEP - SharedBuffer.
Best, Dawid On 30/10/2018 20:59, Austin Cawley-Edwards wrote: > Following up, we are using Flink 1.5.0 and Flink-CEP 2.11. > > Thanks, > Austin > > On Tue, Oct 30, 2018 at 3:58 PM Austin Cawley-Edwards > <austin.caw...@gmail.com <mailto:austin.caw...@gmail.com>> wrote: > > Hi there, > > We have a streaming application that uses CEP processing but are > getting this error fairly frequently after a checkpoint fails, > though not sure if it is related. We have implemented both > `hashCode` and `equals()` using `Objects.hash(...properties)` and > basic equality, respectively. Has anyone seen this before using CEP? > > Here is the full exception: > > java.lang.RuntimeException: Exception occurred while processing valve > output watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalStateException: Could not find previous entry > with key: alertOne, value: > {"totalDocumentCount":12,"alertLevel":3,"id":"entity:medium:13790","predicted":4.220480902392199,"timestamp":1539700396000} > and timestamp: 1539700799999. This can indicate that either you did not > implement the equals() and hashCode() methods of your input elements properly > or that the element belonging to that entry has been already pruned. > at org.apache.flink.cep.nfa.SharedBuffer.put(SharedBuffer.java:107) > at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:566) > at org.apache.flink.cep.nfa.NFA.process(NFA.java:252) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at > java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) > > Best, > > Austin >
signature.asc
Description: OpenPGP digital signature