Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184104126
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
    @@ -185,36 +188,44 @@ public HeapKeyedStateBackend(
                                
stateName.equals(stateTable.getMetaInfo().getName()),
                                "Incompatible state names. " +
                                        "Was [" + 
stateTable.getMetaInfo().getName() + "], " +
    -                                   "registered with [" + 
newMetaInfo.getName() + "].");
    +                                   "registered with [" + stateName + "].");
     
    -                   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
    +                   if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
                                        && 
!stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
     
                                Preconditions.checkState(
    -                                   
newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
    +                                   
stateType.equals(stateTable.getMetaInfo().getStateType()),
                                        "Incompatible state types. " +
                                                "Was [" + 
stateTable.getMetaInfo().getStateType() + "], " +
    -                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
    +                                           "registered with [" + stateType 
+ "].");
                        }
     
                        @SuppressWarnings("unchecked")
                        RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> 
restoredMetaInfo =
                                
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) 
restoredKvStateMetaInfos.get(stateName);
     
                        // check compatibility results to determine if state 
migration is required
    +                   TypeSerializer<N> newNamespaceSerializer = 
namespaceSerializer.duplicate();
                        CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
                                        
restoredMetaInfo.getNamespaceSerializer(),
                                        null,
                                        
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -                                   newMetaInfo.getNamespaceSerializer());
    +                                   newNamespaceSerializer);
     
    +                   TypeSerializer<V> newValueSerializer = 
valueSerializer.duplicate();
    --- End diff --
    
    Similar to my comment on the Rocks code, this duplicate seems redundant, 
because the serializer also comes straight from a `StateDescriptor` which 
duplicates before handing it out. One thing to consider when removing this 
call: it is just less obvious here that the serializer was already duplicated, 
so maybe it would be good to pass the state descriptor as argument and get the 
serializer directly here to avoid any surprises for people working on this in 
the future.


---

Reply via email to