[ https://issues.apache.org/jira/browse/FLINK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711678#comment-16711678 ]
ASF GitHub Bot commented on FLINK-11087: ---------------------------------------- tzulitai opened a new pull request #7256: [FLINK-11087] [state] Incorrect K/V serializer association when reading broadcast from 1.5.x snapshots URL: https://github.com/apache/flink/pull/7256 ## What is the purpose of the change This is a bug that prevents Flink versions 1.6.x (up to latest 1.6.2) and 1.7.0 to successfully restore broadcast state that was taken in 1.5.x. The problem is that when restoring a broadcast state's meta information from a 1.5.x savepoint, the `LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3` incorrectly associates the first restored serializer as the value serializer, and the second restored serializer as the key serializer. The actual order of this should be the other way around. This PR also updates the `StatefulJobWBroadcastStateMigrationITCase` to have different K/V types for the broadcast states under test, as well as re-generate the test savepoints in `release-1.5` and `release-1.6`. That migration ITCase failed to catch this bug, because with K/V types being identical (and therefore identical serializers), the incorrect association didn't affect the result of the test. ## Brief change log - Fix K/V serializer association in `LegacyStateMetaInfoReaders.OperatorBackendStateMetaInfoReaderV2V3`. - Update `StatefulJobWBroadcastStateMigrationITCase` to have different K/V types for the broadcast states under test - Regenerate test savepoints for `StatefulJobWBroadcastStateMigrationITCase` under branches `release-1.6` and `release-1.5`. - Amend compatibility table in docs (https://ci.apache.org/projects/flink/flink-docs-master/ops/upgrading.html#compatibility-table) to notify the issue. ## Verifying this change The updated `StatefulJobWBroadcastStateMigrationITCase` should pass. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / no / don't know) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 > ------------------------------------------------------------- > > Key: FLINK-11087 > URL: https://issues.apache.org/jira/browse/FLINK-11087 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.6.2, 1.7.0 > Environment: Migration from Flink 1.5.3 to Flink 1.7.0 > Reporter: Edward Rojas > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Labels: Migration, State, broadcast, pull-request-available > Fix For: 1.6.3, 1.7.1 > > > When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast > state throws the following error: > {noformat} > org.apache.flink.util.StateMigrationException: The new key serializer for > broadcast state must not be incompatible. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745){noformat} > The broadcast is using a MapState with StringSerializer as key serializer and > a custom JsonSerializer as value serializer. > There was no changes in the TypeSerializers used, only upgrade of version. > > With some debugging I see that at the moment of the validation of the > compatibility of states in the DefaultOperatorStateBackend class, the > "*registeredBroadcastStates*" containing the data about the 'old' state, > contains wrong association of the key and value serializer. This is, > JsonSerializer appears as key serializer and StringSerializer appears as > value serializer. (when it should be the contrary) > > After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" > class is the responsible of this swap here: > https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)