Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115141548 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception protected <N, S> ColumnFamilyHandle getColumnFamily( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException { - Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); - RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - namespaceSerializer, - descriptor.getSerializer()); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); if (stateInfo != null) { - if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + newMetaInfo.getName().equals(restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "], " + + "registered with [" + newMetaInfo.getName() + "]."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check serializer migration strategies to determine if state migration is required + + boolean requireMigration = false; + + // only check migration strategy if there is a restored configuration snapshot; + // there wouldn't be one if we were restored from an older version checkpoint, + // in which case we can only simply assume that migration is not required + + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) { + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot()); + + TypeSerializer<N> finalOldNamespaceSerializer; + if (namespaceMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + } + } + + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) { + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot()); + + TypeSerializer<S> finalOldStateSerializer; + if (stateMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (stateMigrationStrategy.getFallbackDeserializer() != null) { --- End diff -- I'll change this to the suggested new flow :)
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---