Are these limitations documented somewhere @Jark? I couldn't find it on the quick. If not, then we should update the documentation accordingly. In particular the problem with using the RowData as a key makes FLINK-16998 not easy to work around.
Cheers, Till On Wed, Dec 30, 2020 at 11:20 AM Jark Wu <imj...@gmail.com> wrote: > Hi Guenther, > > If you are using the old planner in 1.9, and using the old planner in > 1.11, then a state migration is > needed because of the new added RowKind field. This is documented in the > 1.11 release note [1]. > > If you are using the old planner in 1.9, and using the blink planner in > 1.11, the state is not compatible. > Because blink planner uses a different serializer for the keys and fields, > i.e. RowData vs Row. > > Actually, Flink Table/SQL API doesn't provide state compatibility across > major versions (i.e. 1.9, 1.10, 1.11). > This is because it's quite difficult to keep state compatible for SQL > queries as the physical plan may change > when we introduce even a minor optimization, and we may also change the > state structure to have better performance for operators. > > Best, > Jark > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#added-a-changeflag-to-row-type-flink-16998 > > On Wed, 30 Dec 2020 at 17:47, Till Rohrmann <trohrm...@apache.org> wrote: > >> Hi Guenther, >> >> sorry for overlooking your colleague's email. >> >> I think the answer to your problem is twofold. The underlying problem is >> that your query seems to use `RowData` as a key for some keyed operation. >> Since changing the key format might entail that keys need to be differently >> partitioned, Flink does not support changing the key format. That is why >> Flink fails also if the key format is compatible after migration. There is >> a small warning about this on the state evolution page [1]. >> >> The other part of the answer is that Flink does not support strict >> backwards compatibility for SQL queries if I am not mistaken (please chime >> in if this is no longer correct @Timo Walther <twal...@apache.org> and >> @j...@apache.org <j...@apache.org>). The problem is that queries might >> result in different execution plans after a version upgrade which then can >> not be mapped to the old state. Admittedly, in this case, it should have >> been possible but changing the `RowData` type which is used as a key breaks >> backwards compatibility. A bit confusing is that FLINK-16998 explicitly >> states that this change is not breaking backwards compatibility. >> >> What you could try to use as a workaround is Flink's state processor API >> [2] which allows you to rewrite savepoints. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html >> >> Cheers, >> Till >> >> On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <fl...@sysfrog.org> >> wrote: >> >>> Hello, >>> >>> We're currently running into an issue upgrading the state of an >>> application to Flink 1.11 and I think this could be caused by a >>> potential backwards incompatibility that was introduced with Flink >>> 1.11. A colleague of mine recently posted about it on the users list >>> (without a response), but I'd like to bring this up here on the dev >>> list in order to figure out if that incompatibility is intended >>> behavior and/or a known issue. >>> >>> We're seeing that issue when trying to load the RocksDB state from >>> Flink 1.9 into Flink 1.11 with an application that uses the Flink >>> table environment. Immediately after startup >>> RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new >>> key serializer must be compatible" exception. >>> >>> The reason for that seems to be that FLINK-16998 changed the row >>> serialization format by adding a new Row#getKind field. There's a >>> legacy mode of the row serializer but that's only used for reading the >>> existing snapshot. As a consequence >>> RowSerializerSnapshot#resolveOuterSchemaCompatibility will always >>> return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint >>> with Flink 1.11. >>> >>> The problem is that AbstractRocksDBRestoreOperation#readMetaData >>> treats "compatible after migration" the same as "incompatible" and >>> throws a "The new key serializer must be compatible." exception if it >>> encounters that result. >>> >>> Is it expected that the introduction of Row#getKind breaks existing >>> older state or is that a bug? So far I only reproduced this issue in a >>> somewhat more complex codebase, but in case this is an unknown issue >>> or not the intended behavior I can try to provide a small testcase (to >>> rule out that anything in our own code triggers that issue). >>> >>> Example of a query that triggers that issue: >>> https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21 >>> >>> Full stacktrace: >>> https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00 >>> >>> - Guenther >>> >>