[ 
https://issues.apache.org/jira/browse/FLINK-5041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15725646#comment-15725646
 ] 

ASF GitHub Bot commented on FLINK-5041:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2781#discussion_r91087211
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
    @@ -346,4 +359,130 @@ public String toString() {
                return "HeapKeyedStateBackend";
        }
     
    +   /**
    +    * REMOVE
    +    */
    +   @Internal
    +   @Deprecated
    +   public Map<String, StateTable<K, ?, ?>> getStateTables() {
    +           return stateTables;
    +   }
    +
    +   @Deprecated
    +   private void restoreOldSavepointKeyedState(
    +                   Collection<KeyGroupsStateHandle> stateHandles) throws 
IOException, ClassNotFoundException {
    +
    +           if (stateHandles.isEmpty()) {
    +                   return;
    +           }
    +
    +           HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates =
    +                           
InstantiationUtil.deserializeObject(stateHandles.iterator().next().openInputStream(),
 userCodeClassLoader);
    +
    +           for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState 
: namedStates.entrySet()) {
    +
    +                   KvStateSnapshot<K, ?, ?, ?> genericSnapshot = 
nameToState.getValue();
    +
    +                   final RestoredState restoredState;
    +
    +                   if (genericSnapshot instanceof 
AbstractMemStateSnapshot) {
    +
    +                           AbstractMemStateSnapshot<K, ?, ?, ?, ?> 
stateSnapshot =
    +                                           (AbstractMemStateSnapshot<K, ?, 
?, ?, ?>) nameToState.getValue();
    +
    +                           restoredState = restoreHeapState(stateSnapshot);
    +
    +                   } else if (genericSnapshot instanceof 
AbstractFsStateSnapshot) {
    +
    +                           AbstractFsStateSnapshot<K, ?, ?, ?, ?> 
stateSnapshot =
    +                                           (AbstractFsStateSnapshot<K, ?, 
?, ?, ?>) nameToState.getValue();
    +                           restoredState = restoreFsState(stateSnapshot);
    +                   } else {
    +                           throw new IllegalStateException("Unknown state: 
" + genericSnapshot);
    +                   }
    +
    +                   Map rawResultMap = restoredState.getRawResultMap();
    +                   TypeSerializer<?> namespaceSerializer = 
restoredState.getNamespaceSerializer();
    +                   TypeSerializer<?> stateSerializer = 
restoredState.getStateSerializer();
    +
    +                   if (namespaceSerializer instanceof VoidSerializer) {
    +                           namespaceSerializer = 
VoidNamespaceSerializer.INSTANCE;
    +                   }
    +
    +                   Map nullNameSpaceFix = (Map) rawResultMap.remove(null);
    +
    +                   if (null != nullNameSpaceFix) {
    +                           rawResultMap.put(VoidNamespace.INSTANCE, 
nullNameSpaceFix);
    +                   }
    +
    +                   StateTable<K, ?, ?> stateTable = new 
StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
    +                   stateTable.getState().set(0, rawResultMap);
    +
    +                   // add named state to the backend
    +                   getStateTables().put(nameToState.getKey(), stateTable);
    +           }
    +   }
    +
    +   private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, 
?, ?, ?> stateSnapshot) throws IOException {
    +           return new RestoredState(
    +                           stateSnapshot.deserialize(),
    +                           stateSnapshot.getNamespaceSerializer(),
    +                           stateSnapshot.getStateSerializer());
    +   }
    +
    +   private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, 
?, ?> stateSnapshot) throws IOException {
    +           FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
    +           //TODO register closeable to support fast cancelation?
    +           try (FSDataInputStream inStream = 
fs.open(stateSnapshot.getFilePath())) {
    +
    +                   DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(inStream);
    +
    +                   final int numKeys = inView.readInt();
    --- End diff --
    
    Minor comment: It seems this variable actually means `numNamespaces`.


> Implement savepoint backwards compatibility 1.1 -> 1.2
> ------------------------------------------------------
>
>                 Key: FLINK-5041
>                 URL: https://issues.apache.org/jira/browse/FLINK-5041
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>
> This issue tracks the implementation of backwards compatibility between Flink 
> 1.1 and 1.2 releases.
> This task subsumes:
> - Converting old savepoints to new savepoints, including a conversion of 
> state handles to their new replacement.
> - Converting keyed state from old backend implementations to their new 
> counterparts.
> - Converting operator and function state for all changed operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to