Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184657584 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -185,36 +188,44 @@ public HeapKeyedStateBackend( stateName.equals(stateTable.getMetaInfo().getName()), "Incompatible state names. " + "Was [" + stateTable.getMetaInfo().getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); + "registered with [" + stateName + "]."); - if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + if (!stateType.equals(StateDescriptor.Type.UNKNOWN) && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) { Preconditions.checkState( - newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()), + stateType.equals(stateTable.getMetaInfo().getStateType()), "Incompatible state types. " + "Was [" + stateTable.getMetaInfo().getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); + "registered with [" + stateType + "]."); } @SuppressWarnings("unchecked") RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> restoredMetaInfo = (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) restoredKvStateMetaInfos.get(stateName); // check compatibility results to determine if state migration is required + TypeSerializer<N> newNamespaceSerializer = namespaceSerializer.duplicate(); CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( restoredMetaInfo.getNamespaceSerializer(), null, restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), - newMetaInfo.getNamespaceSerializer()); + newNamespaceSerializer); + TypeSerializer<V> newValueSerializer = valueSerializer.duplicate(); --- End diff -- I've updated this according to your suggestion: The reconfiguration method now gets a state descriptor as an argument to reduce confusion.
---