I am running Flink 1.3.0 against Kafka 0.10. I managed to bring the flink
cluster up and have been running my flink CEP job for more than 3 hours
when I see the following exception :

The messages consumed from Kafka are protobuf messages and I use a protobuf
serializer. i have no clue as to where is this exception coming from. Can
someone help?



java.lang.IllegalStateException: Could not initialize keyed state backend.
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
    at
java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2828)
    at
java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2862)
    at
java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2764)
    at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:2196)
    at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1838)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
    at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1203)
    at
org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1161)
    at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:948)
    at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:839)
    at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
    at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:473)
    at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:354)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:771)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
    ... 6 more

Reply via email to