Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6325#discussion_r202325526 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV> migrateStateIfNecessary( + StateDescriptor<S, SV> stateDesc, + TypeSerializer<N> namespaceSerializer, + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception { + + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot = + (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get( + stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc); + + TypeSerializer<SV> stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer); + + // check compatibility results to determine if state migration is required + TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(), + namespaceSerializer); + + TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(), + stateSerializer); + + if (namespaceCompatibility.isIncompatible()) { + throw new UnsupportedOperationException( + "Changing the namespace TypeSerializer in an incompatible way is currently not supported."); + } + + if (stateCompatibility.isIncompatible()) { --- End diff -- refactoring ð
---