[ https://issues.apache.org/jira/browse/FLINK-5041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15725651#comment-15725651 ]
ASF GitHub Bot commented on FLINK-5041: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2781#discussion_r91086437 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -346,4 +359,130 @@ public String toString() { return "HeapKeyedStateBackend"; } + /** + * REMOVE + */ + @Internal + @Deprecated + public Map<String, StateTable<K, ?, ?>> getStateTables() { + return stateTables; + } + + @Deprecated + private void restoreOldSavepointKeyedState( + Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException { + + if (stateHandles.isEmpty()) { + return; + } + + HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates = + InstantiationUtil.deserializeObject(stateHandles.iterator().next().openInputStream(), userCodeClassLoader); + + for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) { + + KvStateSnapshot<K, ?, ?, ?> genericSnapshot = nameToState.getValue(); + + final RestoredState restoredState; + + if (genericSnapshot instanceof AbstractMemStateSnapshot) { + + AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot = + (AbstractMemStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue(); + + restoredState = restoreHeapState(stateSnapshot); + + } else if (genericSnapshot instanceof AbstractFsStateSnapshot) { + + AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot = + (AbstractFsStateSnapshot<K, ?, ?, ?, ?>) nameToState.getValue(); + restoredState = restoreFsState(stateSnapshot); + } else { + throw new IllegalStateException("Unknown state: " + genericSnapshot); + } + + Map rawResultMap = restoredState.getRawResultMap(); + TypeSerializer<?> namespaceSerializer = restoredState.getNamespaceSerializer(); + TypeSerializer<?> stateSerializer = restoredState.getStateSerializer(); + + if (namespaceSerializer instanceof VoidSerializer) { + namespaceSerializer = VoidNamespaceSerializer.INSTANCE; + } + + Map nullNameSpaceFix = (Map) rawResultMap.remove(null); --- End diff -- This looks like it should be guarded by a test ;-) > Implement savepoint backwards compatibility 1.1 -> 1.2 > ------------------------------------------------------ > > Key: FLINK-5041 > URL: https://issues.apache.org/jira/browse/FLINK-5041 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > > This issue tracks the implementation of backwards compatibility between Flink > 1.1 and 1.2 releases. > This task subsumes: > - Converting old savepoints to new savepoints, including a conversion of > state handles to their new replacement. > - Converting keyed state from old backend implementations to their new > counterparts. > - Converting operator and function state for all changed operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)