[ https://issues.apache.org/jira/browse/FLINK-4169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15966258#comment-15966258 ]
Shashank Agarwal commented on FLINK-4169: ----------------------------------------- Again getting this error in flink 1.2, Running on yarn cluster. After changing state backend from Rocksdb it's working fine. ``` 04/12/2017 10:05:04 Job execution switched to status FAILING. java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.cep.nfa.NFA$Serializer.deserialize(NFA.java:538) at org.apache.flink.cep.nfa.NFA$Serializer.deserialize(NFA.java:469) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:81) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:124) at org.apache.flink.cep.operator.AbstractCEPBasePatternOperator.processElement(AbstractCEPBasePatternOperator.java:72) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:162) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: co.ronak.nto.Job$$anon$18$$anon$21$$anon$3 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:53) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) at org.apache.flink.cep.NonDuplicatingTypeSerializer.readObject(NonDuplicatingTypeSerializer.java:190) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501) at org.apache.flink.cep.nfa.NFA.readObject(NFA.java:394) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.cep.nfa.NFA$Serializer.deserialize(NFA.java:535) ... 10 more ``` > CEP Does Not Work with RocksDB StateBackend > ------------------------------------------- > > Key: FLINK-4169 > URL: https://issues.apache.org/jira/browse/FLINK-4169 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > > A job will never match any patterns because {{ValueState.update()}} is not > called in the keyed CEP operators for updating the {{NFA}} state and the > priority queue state. > The reason why it works for other state backends is that they are very lax in > their handling of state: if the object returned from {{ValueState.value())}} > is mutable changes to this will be reflected in checkpoints even if > {{ValueState.update()}} is not called. RocksDB, on the other hand, does > always deserialize/serialize state values when accessing/updating them, so > changes to the returned object will not be reflected in the state unless > {{update()}} is called. > We should fix this and also add a test for it. This might be tricky because > we have to pull together RocksDB and CEP. -- This message was sent by Atlassian JIRA (v6.3.15#6346)