[ https://issues.apache.org/jira/browse/FLINK-5041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15725646#comment-15725646 ]
ASF GitHub Bot commented on FLINK-5041: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2781#discussion_r91087211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -346,4 +359,130 @@ public String toString() { return "HeapKeyedStateBackend"; } + /** + * REMOVE + */ + @Internal + @Deprecated + public Map<String, StateTable<K, ?, ?>> getStateTables() { + return stateTables; + } + + @Deprecated + private void restoreOldSavepointKeyedState( + Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException { + + if (stateHandles.isEmpty()) { + return; + } + + HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates = + InstantiationUtil.deserializeObject(stateHandles.iterator().next().openInputStream(), userCodeClassLoader); + + for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) { + + KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue(); + + final RestoredState restoredState; + + if (genericSnapshot instanceof AbstractMemStateSnapshot) { + + AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot = + (AbstractMemStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue(); + + restoredState = restoreHeapState(stateSnapshot); + + } else if (genericSnapshot instanceof AbstractFsStateSnapshot) { + + AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot = + (AbstractFsStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue(); + restoredState = restoreFsState(stateSnapshot); + } else { + throw new IllegalStateException("Unknown state: " + genericSnapshot); + } + + Map rawResultMap = restoredState.getRawResultMap(); + TypeSerializer<?> namespaceSerializer = restoredState.getNamespaceSerializer(); + TypeSerializer<?> stateSerializer = restoredState.getStateSerializer(); + + if (namespaceSerializer instanceof VoidSerializer) { + namespaceSerializer = VoidNamespaceSerializer.INSTANCE; + } + + Map nullNameSpaceFix = (Map) rawResultMap.remove(null); + + if (null != nullNameSpaceFix) { + rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix); + } + + StateTable<K, ?, ?> stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange); + stateTable.getState().set(0, rawResultMap); + + // add named state to the backend + getStateTables().put(nameToState.getKey(), stateTable); + } + } + + private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException { + return new RestoredState( + stateSnapshot.deserialize(), + stateSnapshot.getNamespaceSerializer(), + stateSnapshot.getStateSerializer()); + } + + private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException { + FileSystem fs = stateSnapshot.getFilePath().getFileSystem(); + //TODO register closeable to support fast cancelation? + try (FSDataInputStream inStream = fs.open(stateSnapshot.getFilePath())) { + + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inStream); + + final int numKeys = inView.readInt(); --- End diff -- Minor comment: It seems this variable actually means `numNamespaces`. > Implement savepoint backwards compatibility 1.1 -> 1.2 > ------------------------------------------------------ > > Key: FLINK-5041 > URL: https://issues.apache.org/jira/browse/FLINK-5041 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > > This issue tracks the implementation of backwards compatibility between Flink > 1.1 and 1.2 releases. > This task subsumes: > - Converting old savepoints to new savepoints, including a conversion of > state handles to their new replacement. > - Converting keyed state from old backend implementations to their new > counterparts. > - Converting operator and function state for all changed operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)