Are these limitations documented somewhere @Jark? I couldn't find it on the
quick. If not, then we should update the documentation accordingly. In
particular the problem with using the RowData as a key makes FLINK-16998
not easy to work around.

Cheers,
Till

On Wed, Dec 30, 2020 at 11:20 AM Jark Wu <imj...@gmail.com> wrote:

> Hi Guenther,
>
> If you are using the old planner in 1.9, and using the old planner in
> 1.11, then a state migration is
> needed because of the new added RowKind field. This is documented in the
> 1.11 release note [1].
>
> If you are using the old planner in 1.9, and using the blink planner in
> 1.11, the state is not compatible.
> Because blink planner uses a different serializer for the keys and fields,
> i.e. RowData vs Row.
>
> Actually, Flink Table/SQL API doesn't provide state compatibility across
> major versions (i.e. 1.9, 1.10, 1.11).
> This is because it's quite difficult to keep state compatible for SQL
> queries as the physical plan may change
> when we introduce even a minor optimization, and we may also change the
> state structure to have better performance for operators.
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#added-a-changeflag-to-row-type-flink-16998
>
> On Wed, 30 Dec 2020 at 17:47, Till Rohrmann <trohrm...@apache.org> wrote:
>
>> Hi Guenther,
>>
>> sorry for overlooking your colleague's email.
>>
>> I think the answer to your problem is twofold. The underlying problem is
>> that your query seems to use `RowData` as a key for some keyed operation.
>> Since changing the key format might entail that keys need to be differently
>> partitioned, Flink does not support changing the key format. That is why
>> Flink fails also if the key format is compatible after migration. There is
>> a small warning about this on the state evolution page [1].
>>
>> The other part of the answer is that Flink does not support strict
>> backwards compatibility for SQL queries if I am not mistaken (please chime
>> in if this is no longer correct @Timo Walther <twal...@apache.org> and
>> @j...@apache.org <j...@apache.org>). The problem is that queries might
>> result in different execution plans after a version upgrade which then can
>> not be mapped to the old state. Admittedly, in this case, it should have
>> been possible but changing the `RowData` type which is used as a key breaks
>> backwards compatibility. A bit confusing is that FLINK-16998 explicitly
>> states that this change is not breaking backwards compatibility.
>>
>> What you could try to use as a workaround is Flink's state processor API
>> [2] which allows you to rewrite savepoints.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> Cheers,
>> Till
>>
>> On Wed, Dec 30, 2020 at 3:30 AM Guenther Starnberger <fl...@sysfrog.org>
>> wrote:
>>
>>> Hello,
>>>
>>> We're currently running into an issue upgrading the state of an
>>> application to Flink 1.11 and I think this could be caused by a
>>> potential backwards incompatibility that was introduced with Flink
>>> 1.11. A colleague of mine recently posted about it on the users list
>>> (without a response), but I'd like to bring this up here on the dev
>>> list in order to figure out if that incompatibility is intended
>>> behavior and/or a known issue.
>>>
>>> We're seeing that issue when trying to load the RocksDB state from
>>> Flink 1.9 into Flink 1.11 with an application that uses the Flink
>>> table environment. Immediately after startup
>>> RocksDBFullRestoreOperation#restoreKVStateMetaData raises an "The new
>>> key serializer must be compatible" exception.
>>>
>>> The reason for that seems to be that FLINK-16998 changed the row
>>> serialization format by adding a new Row#getKind field. There's a
>>> legacy mode of the row serializer but that's only used for reading the
>>> existing snapshot. As a consequence
>>> RowSerializerSnapshot#resolveOuterSchemaCompatibility will always
>>> return COMPATIBLE_AFTER_MIGRATION when reading a Flink 1.9 savepoint
>>> with Flink 1.11.
>>>
>>> The problem is that AbstractRocksDBRestoreOperation#readMetaData
>>> treats "compatible after migration" the same as "incompatible" and
>>> throws a "The new key serializer must be compatible." exception if it
>>> encounters that result.
>>>
>>> Is it expected that the introduction of Row#getKind breaks existing
>>> older state or is that a bug? So far I only reproduced this issue in a
>>> somewhat more complex codebase, but in case this is an unknown issue
>>> or not the intended behavior I can try to provide a small testcase (to
>>> rule out that anything in our own code triggers that issue).
>>>
>>> Example of a query that triggers that issue:
>>> https://gist.github.com/gstarnberger/f19a30df179c72a622490cbb041adb21
>>>
>>> Full stacktrace:
>>> https://gist.github.com/gstarnberger/336d94e723b3ec599d09e17dd7d0ee00
>>>
>>> - Guenther
>>>
>>

Reply via email to