Hi Ammon Diether, This is actually not a bug, for logical (and documented) reasons keys can not be schema-migrated:
* When storing state / hash-distributing events, the target key group (one out of max parallelism) is calculated from the key hash. * If you change the key, the hash changes and hence the key group * Therefore schema migration is suppressed for keys What you can do to solve the situation is to transcode your savepoint by means of the State Processor API [1]: Read by means of the old key type, write by means of the new key type. Also consider, if you change the key type, probably also the semantics of your aggregations etc. changes, hence you might run into troubles with data correctness with respect to your business logic. I hope that helps. Sincerely Thias [1] https://nightlies.apache.org/flink/flink-docs-release-1.20/ From: Ammon Diether <adiet...@gmail.com> Sent: Wednesday, October 16, 2024 9:00 PM To: user <user@flink.apache.org> Subject: [External] Terrible bug related to serialization of Pojos in keystate. ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ We have encountered a rather rare, but very nasty bug with Flink related to serialization of Pojos in keystate. -- Timeline -- 1) Write a specific item to keystate of class C at Time1, no read of that key will happen until step 5. 2) Time elapses 3) class C is schema evolved to include an additional field 4) Time elapses 5) When reading the specific item written above, we get a EOFException being thrown to AbstractRocksDBState.migrateSerializedValue 6) Reading the item puts Flink into a restart loop of death. Manual intervention is required. -- details at the time of writing the value to keystate -- class C { // at Time1 private String fieldAA = "AA"; private String fieldBB = "BB"; } The serialized buffer looks like so: 02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 03 42 42 is_null, len+1, 'B', 'B' Serialized Field list is: [fieldAA, fieldBB] -- schema evolution -- class C { // at Time3 private String fieldAA = "AA"; private Integer fieldAB = -1; private String fieldBB = "BB"; } -- details at the time of reading the value from keystate -- The serialized buffer looks like so: 02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 ff ff ff ff is_null, -1 00 03 42 42 is_null, len+1, 'B', 'B' Serialized Field list is: [fieldAA, fieldBB] When reading the buffer, flink will read fieldAA just fine, it will then attempt to read ff ff ff ff as a string for fieldBB. Something has altered the buffer such that it has field AB, but the Serialized Field list does not have fieldAB. -- Runtime Details and notes -- flink 1.14.3 stateful functions 300 GB savepoint size. The total time elapsed between write and read seems to need to be a few months for this corruption to happen. Questions: A) Any insight into the general mechanism related to Pojos and serialization? B) What can cause a keystate to be migrated? Clearly a read does, what about just checkpointing over time, how about reading keystate with a key that is "close" to the other key? C) If a specific key in keystate is deserialized from rockdb, does flink deserialize other (adjacent?) keys in the "block" of data? D) Are there tools for manually editing Flink savepoints? Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.