Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184098081 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -185,36 +188,44 @@ public HeapKeyedStateBackend( stateName.equals(stateTable.getMetaInfo().getName()), "Incompatible state names. " + "Was [" + stateTable.getMetaInfo().getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); + "registered with [" + stateName + "]."); - if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + if (!stateType.equals(StateDescriptor.Type.UNKNOWN) && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) { Preconditions.checkState( - newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()), + stateType.equals(stateTable.getMetaInfo().getStateType()), "Incompatible state types. " + "Was [" + stateTable.getMetaInfo().getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); + "registered with [" + stateType + "]."); } @SuppressWarnings("unchecked") RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo = (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName); // check compatibility results to determine if state migration is required + TypeSerializer<N> newNamespaceSerializer = namespaceSerializer.duplicate(); --- End diff -- Just curious, why do we need to duplicate the serializer here but not in all other places like where `resolveCompatibilityResult()` is called? Or asked differently, should `resolveCompatibilityResult()` always do duplication internally or not at all or is this just as intended?
---