Hi again Varun, I am investigating the problem you mentioned and I found a bug in the SharedBuffer, but I am not sure if it is the only bug that affects you.
Could you please try this branch https://github.com/kl0u/flink/tree/cep-inv <https://github.com/kl0u/flink/tree/cep-inv> and let me know if the problem is still there? In addition, are you using Scala with case classes or Java? Thanks for helping fix the problem, Kostas > On Jan 24, 2018, at 5:54 PM, Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > > Hi Varun, > > Thanks for taking time to look into it. Could you give a sample input and > your pattern to reproduce the problem? > That would help a lot at figuring out the cause of the problem. > > Thanks, > Kostas > >> On Jan 23, 2018, at 5:40 PM, Varun Dhore <varundhor...@gmail.com >> <mailto:varundhor...@gmail.com>> wrote: >> >> Hi Kostas, >> >> I was able to reproduce the error with 1.4.0. After upgrading the cluster to >> 1.5 snapshot and running through the same data I am still experiencing the >> same exception. CEP patterns that I am running are using followed by >> patterns e.g AfBfC. From my experience I was never able to get stable >> execution when checkpoints are enabled. When I disable checkpoints CEP jobs >> are running fine. Aside from this particular error I also notice that >> majority of checkpoints expire as the do not complete within configured 5 >> min timeout period. Any suggestions on further debugging runtime checkpoints >> would be very helpful. >> Thanks in advance for your assistance. >> >> Regards, >> Varun >> >> On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <k.klou...@data-artisans.com >> <mailto:k.klou...@data-artisans.com>> wrote: >> >>> Thanks a lot Varun! >>> >>> Kostas >>> >>>> On Jan 17, 2018, at 9:59 PM, Varun Dhore <varundhor...@gmail.com >>>> <mailto:varundhor...@gmail.com>> wrote: >>>> >>>> Thank you Kostas. Since this error is not easily reproducible on my end >>>> I’ll continue testing this and confirm the resolution once I am able to do >>>> so. >>>> >>>> Thanks, >>>> Varun >>>> >>>> Sent from my iPhone >>>> >>>> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <k.klou...@data-artisans.com >>>> <mailto:k.klou...@data-artisans.com>> wrote: >>>> >>>>> Hi Varun, >>>>> >>>>> This can be related to this issue: >>>>> https://issues.apache.org/jira/browse/FLINK-8226 >>>>> <https://issues.apache.org/jira/browse/FLINK-8226> >>>>> which is currently fixed on the master. >>>>> >>>>> Could you please try the current master to see if the error persists? >>>>> >>>>> Thanks, >>>>> Kostas >>>>> >>>>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore <varundhor...@gmail.com >>>>>> <mailto:varundhor...@gmail.com>> wrote: >>>>>> >>>>>> >>>>>> >>>>>>> Hello Flink community, >>>>>>> >>>>>>> I have encountered following exception while testing 1.4.0 release. >>>>>>> This error is occurring intermittently and my CEP job keeps restarting >>>>>>> after this exception. I am running the job with Event time semantics >>>>>>> and checkpoints enabled. >>>>>>> >>>>>>> >>>>>>> java.lang.RuntimeException: Exception occurred while >>>>>>> processing valve output watermark: >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>>>>>> at >>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> Caused by: java.lang.RuntimeException: Error while adding >>>>>>> data to RocksDB >>>>>>> at >>>>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102) >>>>>>> at >>>>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276) >>>>>>> at >>>>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) >>>>>>> ... 7 more >>>>>>> Caused by: java.lang.IllegalStateException: Could not find >>>>>>> id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: >>>>>>> ”e1”, timestamp: 1515593398897), 1515593398897, 0), >>>>>>> [SharedBufferEdge(null, 1)], 2) >>>>>>> at >>>>>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>>>>>> at >>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972) >>>>>>> at >>>>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839) >>>>>>> at >>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919) >>>>>>> at >>>>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839) >>>>>>> at >>>>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99) >>>>>>> ... 13 more >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Varun >>>>> >>> >