Dear Flink Users,
I have a Flink (v1.2.1) process I left running for the last five days.  It 
aggregates a bit of state and exposes it via Queryable State.  It ran correctly 
for the first 3 days.  There were no code changes or data changes, but suddenly 
Queryable State got weird.  The process logs the current value of the queryable 
state, and from the logs I discerned that the state was correctly being 
aggregated.  However they Queryable State that was returned was unable to be 
deserialized.  Rather than the list of longs I expect, instead I get 2 bytes 
(0x 57 02).  It seemed quite clear that the state in the Task Manager was not 
the state I was getting out of Queryable State.

I next reasoned that my data was being check pointed and possibly I could 
restore.  So I restarted the process to recover from a check point.  At this 
point the process fails with the following error

java.lang.IllegalStateException: Could not initialize keyed state backend.
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:653)
    at java.util.ArrayList.get(ArrayList.java:429)
    at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:231)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
    ... 6 more


This looks to me like Flink has serialized out state incorrectly.

I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so I 
could manually set the Kafka partition offset, I backed it up 5 days to replay 
all the data and now everything is working fine again.

However I’m more than a little worried.  Was there a serialization bug fixed in 
1.3 ?  I don’t believe there’s anything in my code that could be causing such 
an issue, but is there something in my jobs that could make something like this 
happen?  Is this a known bug?  The fact that it not only results in bad data in 
the query but appears to take down my disaster recovery plan makes me a bit 
nervous here.

Thanks for your time,
Phil

Reply via email to