Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184104126 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -185,36 +188,44 @@ public HeapKeyedStateBackend( stateName.equals(stateTable.getMetaInfo().getName()), "Incompatible state names. " + "Was [" + stateTable.getMetaInfo().getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); + "registered with [" + stateName + "]."); - if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + if (!stateType.equals(StateDescriptor.Type.UNKNOWN) && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) { Preconditions.checkState( - newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()), + stateType.equals(stateTable.getMetaInfo().getStateType()), "Incompatible state types. " + "Was [" + stateTable.getMetaInfo().getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); + "registered with [" + stateType + "]."); } @SuppressWarnings("unchecked") RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo = (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName); // check compatibility results to determine if state migration is required + TypeSerializer<N> newNamespaceSerializer = namespaceSerializer.duplicate(); CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( restoredMetaInfo.getNamespaceSerializer(), null, restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), - newMetaInfo.getNamespaceSerializer()); + newNamespaceSerializer); + TypeSerializer<V> newValueSerializer = valueSerializer.duplicate(); --- End diff -- Similar to my comment on the Rocks code, this duplicate seems redundant, because the serializer also comes straight from a `StateDescriptor` which duplicates before handing it out. One thing to consider when removing this call: it is just less obvious here that the serializer was already duplicated, so maybe it would be good to pass the state descriptor as argument and get the serializer directly here to avoid any surprises for people working on this in the future.
---