Hi Kostas, I was able to reproduce the error with 1.4.0. After upgrading the cluster to 1.5 snapshot and running through the same data I am still experiencing the same exception. CEP patterns that I am running are using followed by patterns e.g AfBfC. From my experience I was never able to get stable execution when checkpoints are enabled. When I disable checkpoints CEP jobs are running fine. Aside from this particular error I also notice that majority of checkpoints expire as the do not complete within configured 5 min timeout period. Any suggestions on further debugging runtime checkpoints would be very helpful. Thanks in advance for your assistance.
Regards, Varun > On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <k.klou...@data-artisans.com> > wrote: > > Thanks a lot Varun! > > Kostas > >> On Jan 17, 2018, at 9:59 PM, Varun Dhore <varundhor...@gmail.com> wrote: >> >> Thank you Kostas. Since this error is not easily reproducible on my end I’ll >> continue testing this and confirm the resolution once I am able to do so. >> >> Thanks, >> Varun >> >> Sent from my iPhone >> >>> On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <k.klou...@data-artisans.com> >>> wrote: >>> >>> Hi Varun, >>> >>> This can be related to this issue: >>> https://issues.apache.org/jira/browse/FLINK-8226 >>> which is currently fixed on the master. >>> >>> Could you please try the current master to see if the error persists? >>> >>> Thanks, >>> Kostas >>> >>>>> On Jan 15, 2018, at 4:09 PM, Varun Dhore <varundhor...@gmail.com> wrote: >>>>> >>>>> >>>>> >>>>> Hello Flink community, >>>>> >>>>> I have encountered following exception while testing 1.4.0 release. This >>>>> error is occurring intermittently and my CEP job keeps restarting after >>>>> this exception. I am running the job with Event time semantics and >>>>> checkpoints enabled. >>>>> >>>>> >>>>> java.lang.RuntimeException: Exception occurred while >>>>> processing valve output watermark: >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >>>>> at >>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >>>>> at >>>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.RuntimeException: Error while adding >>>>> data to RocksDB >>>>> at >>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102) >>>>> at >>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276) >>>>> at >>>>> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248) >>>>> at >>>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) >>>>> at >>>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) >>>>> ... 7 more >>>>> Caused by: java.lang.IllegalStateException: Could not find id >>>>> for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, >>>>> timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, >>>>> 1)], 2) >>>>> at >>>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) >>>>> at >>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972) >>>>> at >>>>> org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839) >>>>> at >>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919) >>>>> at >>>>> org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839) >>>>> at >>>>> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99) >>>>> ... 13 more >>>>> >>>>> >>>>> Thanks, >>>>> Varun >>> >