We have encountered a rather rare, but very nasty bug with Flink related
to serialization of Pojos in keystate.

-- Timeline --
1) Write a specific item to keystate of class C at Time1, no read of that
key will happen until step 5.
2) Time elapses
3) class C is schema evolved to include an additional field
4) Time elapses
5) When reading the specific item written above, we get a EOFException
being thrown to AbstractRocksDBState.migrateSerializedValue
6) Reading the item puts Flink into a restart loop of death.  Manual
intervention is required.

-- details at the time of writing the value to keystate --
class C {  // at Time1
 private String fieldAA = "AA";
 private String fieldBB = "BB";
}

The serialized buffer looks like so:
02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 03 42 42 is_null, len+1,
'B', 'B'
Serialized Field list is: [fieldAA, fieldBB]

-- schema evolution --
class C {  // at Time3
 private String fieldAA = "AA";
 private Integer fieldAB = -1;
 private String fieldBB = "BB";
}

-- details at the time of reading the value from keystate --
The serialized buffer looks like so:
02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 ff ff ff ff is_null, -1 00
03 42 42 is_null, len+1, 'B', 'B'
Serialized Field list is: [fieldAA, fieldBB]

When reading the buffer, flink will read fieldAA just fine, it will then
attempt to read ff ff ff ff as a string for fieldBB.  Something has altered
the buffer such that it has field AB, but the Serialized Field list does
not have fieldAB.

-- Runtime Details  and notes --
flink 1.14.3
stateful functions
300 GB savepoint size.
The total time elapsed between write and read seems to need to be a few
months for this corruption to happen.

Questions:
A) Any insight into the general mechanism related to Pojos and
serialization?
B) What can cause a keystate to be migrated?  Clearly a read does, what
about just checkpointing over time, how about reading keystate with a key
that is "close" to the other key?
C) If a specific key in keystate is deserialized from rockdb, does flink
deserialize other (adjacent?) keys in the "block" of data?
D) Are there tools for manually editing Flink savepoints?

Reply via email to