Hi Ammon Diether,

This is actually not a bug, for logical (and documented) reasons keys can not 
be schema-migrated:

  *   When storing state / hash-distributing events, the target key group (one 
out of max parallelism) is calculated from the key hash.
  *   If you change the key, the hash changes and hence the key group
  *   Therefore schema migration is suppressed for keys

What you can do to solve the situation is to transcode your savepoint by means 
of the State Processor API [1]:
Read by means of the old key type, write by means of the new key type.
Also consider, if you change the key type, probably also the semantics of your 
aggregations etc. changes, hence you might run into troubles with data 
correctness with respect to your business logic.


I hope that helps.

Sincerely

Thias


[1] https://nightlies.apache.org/flink/flink-docs-release-1.20/

From: Ammon Diether <adiet...@gmail.com>
Sent: Wednesday, October 16, 2024 9:00 PM
To: user <user@flink.apache.org>
Subject: [External] Terrible bug related to serialization of Pojos in keystate.

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


We have encountered a rather rare, but very nasty bug with Flink related to 
serialization of Pojos in keystate.

-- Timeline --
1) Write a specific item to keystate of class C at Time1, no read of that key 
will happen until step 5.
2) Time elapses
3) class C is schema evolved to include an additional field
4) Time elapses
5) When reading the specific item written above, we get a EOFException being 
thrown to AbstractRocksDBState.migrateSerializedValue
6) Reading the item puts Flink into a restart loop of death.  Manual 
intervention is required.

-- details at the time of writing the value to keystate --
class C {  // at Time1
 private String fieldAA = "AA";
 private String fieldBB = "BB";
}

The serialized buffer looks like so:
02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 03 42 42 is_null, len+1, 'B', 
'B'
Serialized Field list is: [fieldAA, fieldBB]

-- schema evolution --
class C {  // at Time3
 private String fieldAA = "AA";
 private Integer fieldAB = -1;
 private String fieldBB = "BB";
}

-- details at the time of reading the value from keystate --
The serialized buffer looks like so:
02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 ff ff ff ff is_null, -1 00 03 
42 42 is_null, len+1, 'B', 'B'
Serialized Field list is: [fieldAA, fieldBB]

When reading the buffer, flink will read fieldAA just fine, it will then 
attempt to read ff ff ff ff as a string for fieldBB.  Something has altered the 
buffer such that it has field AB, but the Serialized Field list does not have 
fieldAB.

-- Runtime Details  and notes --
flink 1.14.3
stateful functions
300 GB savepoint size.
The total time elapsed between write and read seems to need to be a few months 
for this corruption to happen.

Questions:
A) Any insight into the general mechanism related to Pojos and serialization?
B) What can cause a keystate to be migrated?  Clearly a read does, what about 
just checkpointing over time, how about reading keystate with a key that is 
"close" to the other key?
C) If a specific key in keystate is deserialized from rockdb, does flink 
deserialize other (adjacent?) keys in the "block" of data?
D) Are there tools for manually editing Flink savepoints?
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to