I am running Flink 1.3.0 against Kafka 0.10. I managed to bring the flink cluster up and have been running my flink CEP job for more than 3 hours when I see the following exception :
The messages consumed from Kafka are protobuf messages and I use a protobuf serializer. i have no clue as to where is this exception coming from. Can someone help? java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2828) at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2862) at java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2764) at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:2196) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1838) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1203) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1161) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:948) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:839) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:473) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:354) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:771) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more