We have encountered a rather rare, but very nasty bug with Flink related to serialization of Pojos in keystate.
-- Timeline -- 1) Write a specific item to keystate of class C at Time1, no read of that key will happen until step 5. 2) Time elapses 3) class C is schema evolved to include an additional field 4) Time elapses 5) When reading the specific item written above, we get a EOFException being thrown to AbstractRocksDBState.migrateSerializedValue 6) Reading the item puts Flink into a restart loop of death. Manual intervention is required. -- details at the time of writing the value to keystate -- class C { // at Time1 private String fieldAA = "AA"; private String fieldBB = "BB"; } The serialized buffer looks like so: 02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 03 42 42 is_null, len+1, 'B', 'B' Serialized Field list is: [fieldAA, fieldBB] -- schema evolution -- class C { // at Time3 private String fieldAA = "AA"; private Integer fieldAB = -1; private String fieldBB = "BB"; } -- details at the time of reading the value from keystate -- The serialized buffer looks like so: 02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 ff ff ff ff is_null, -1 00 03 42 42 is_null, len+1, 'B', 'B' Serialized Field list is: [fieldAA, fieldBB] When reading the buffer, flink will read fieldAA just fine, it will then attempt to read ff ff ff ff as a string for fieldBB. Something has altered the buffer such that it has field AB, but the Serialized Field list does not have fieldAB. -- Runtime Details and notes -- flink 1.14.3 stateful functions 300 GB savepoint size. The total time elapsed between write and read seems to need to be a few months for this corruption to happen. Questions: A) Any insight into the general mechanism related to Pojos and serialization? B) What can cause a keystate to be migrated? Clearly a read does, what about just checkpointing over time, how about reading keystate with a key that is "close" to the other key? C) If a specific key in keystate is deserialized from rockdb, does flink deserialize other (adjacent?) keys in the "block" of data? D) Are there tools for manually editing Flink savepoints?