Hello, We're currently running into an issue upgrading the state of an application to Flink 1.11 and I think this could be caused by a potential backwards incompatibility that was introduced with Flink 1.11. A colleague of mine recently posted about it on the users list (without a response), but I'd like to bring this up here on the dev list in order to figure out if that incompatibility is intended behavior and/or a known issue.
We're seeing that issue when trying to load the RocksDB state from Flink 1.9 into Flink 1.11 with an application that uses the Flink table environment. Immediately after startup RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new key serializer must be compatible" exception. The reason for that seems to be that FLINK-16998 changed the row serialization format by adding a new Row#getKind field. There's a legacy mode of the row serializer but that's only used for reading the existing snapshot. As a consequence RowSerializerSnapshot#resolveOuterSchemaCompatibility will always return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint with Flink 1.11. The problem is that AbstractRocksDBRestoreOperation#readMetaData treats "compatible after migration" the same as "incompatible" and throws a "The new key serializer must be compatible." exception if it encounters that result. Is it expected that the introduction of Row#getKind breaks existing older state or is that a bug? So far I only reproduced this issue in a somewhat more complex codebase, but in case this is an unknown issue or not the intended behavior I can try to provide a small testcase (to rule out that anything in our own code triggers that issue). Example of a query that triggers that issue: https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21 Full stacktrace: https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00 - Guenther