[ 
https://issues.apache.org/jira/browse/FLINK-8715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452492#comment-16452492
 ] 

ASF GitHub Bot commented on FLINK-8715:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184104126
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
    @@ -185,36 +188,44 @@ public HeapKeyedStateBackend(
                                
stateName.equals(stateTable.getMetaInfo().getName()),
                                "Incompatible state names. " +
                                        "Was [" + 
stateTable.getMetaInfo().getName() + "], " +
    -                                   "registered with [" + 
newMetaInfo.getName() + "].");
    +                                   "registered with [" + stateName + "].");
     
    -                   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
    +                   if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
                                        && 
!stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
     
                                Preconditions.checkState(
    -                                   
newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
    +                                   
stateType.equals(stateTable.getMetaInfo().getStateType()),
                                        "Incompatible state types. " +
                                                "Was [" + 
stateTable.getMetaInfo().getStateType() + "], " +
    -                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
    +                                           "registered with [" + stateType 
+ "].");
                        }
     
                        @SuppressWarnings("unchecked")
                        RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V> 
restoredMetaInfo =
                                
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, V>) 
restoredKvStateMetaInfos.get(stateName);
     
                        // check compatibility results to determine if state 
migration is required
    +                   TypeSerializer<N> newNamespaceSerializer = 
namespaceSerializer.duplicate();
                        CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
                                        
restoredMetaInfo.getNamespaceSerializer(),
                                        null,
                                        
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -                                   newMetaInfo.getNamespaceSerializer());
    +                                   newNamespaceSerializer);
     
    +                   TypeSerializer<V> newValueSerializer = 
valueSerializer.duplicate();
    --- End diff --
    
    Similar to my comment on the Rocks code, this duplicate seems redundant, 
because the serializer also comes straight from a `StateDescriptor` which 
duplicates before handing it out. One thing to consider when removing this 
call: it is just less obvious here that the serializer was already duplicated, 
so maybe it would be good to pass the state descriptor as argument and get the 
serializer directly here to avoid any surprises for people working on this in 
the future.


> RocksDB does not propagate reconfiguration of serializer to the states
> ----------------------------------------------------------------------
>
>                 Key: FLINK-8715
>                 URL: https://issues.apache.org/jira/browse/FLINK-8715
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Arvid Heise
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> Any changes to the serializer done in #ensureCompability are lost during the 
> state creation.
> In particular, 
> [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L68]
>  always uses a fresh copy of the StateDescriptor.
> An easy fix is to pass the reconfigured serializer as an additional parameter 
> in 
> [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L1681]
>  , which can be retrieved through the side-output of getColumnFamily
> {code:java}
> kvStateInformation.get(stateDesc.getName()).f1.getStateSerializer()
> {code}
> I encountered it in 1.3.2 but the code in the master seems unchanged (hence 
> the pointer into master). I encountered it in ValueState, but I suspect the 
> same issue can be observed for all kinds of RocksDB states.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to