Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5885#discussion_r184101616 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1125,59 +1125,62 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance( * that we checkpointed, i.e. is already in the map of column families. */ @SuppressWarnings("rawtypes, unchecked") - protected <N, S> ColumnFamilyHandle getColumnFamily( + protected <N, S> Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> getColumnFamilyAndStateSerializer( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException { Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); - RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - namespaceSerializer, - descriptor.getSerializer()); - if (stateInfo != null) { // TODO with eager registration in place, these checks should be moved to restore() RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName()); Preconditions.checkState( - Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()), + Objects.equals(descriptor.getName(), restoredMetaInfo.getName()), "Incompatible state names. " + "Was [" + restoredMetaInfo.getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); + "registered with [" + descriptor.getName() + "]."); - if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN) + if (!Objects.equals(descriptor.getType(), StateDescriptor.Type.UNKNOWN) && !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) { Preconditions.checkState( - newMetaInfo.getStateType() == restoredMetaInfo.getStateType(), + descriptor.getType() == restoredMetaInfo.getStateType(), "Incompatible state types. " + "Was [" + restoredMetaInfo.getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); + "registered with [" + descriptor.getType() + "]."); } // check compatibility results to determine if state migration is required + TypeSerializer<N> newNamespaceSerializer = namespaceSerializer.duplicate(); CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( restoredMetaInfo.getNamespaceSerializer(), null, restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), - newMetaInfo.getNamespaceSerializer()); + newNamespaceSerializer); + TypeSerializer<S> newStateSerializer = descriptor.getSerializer().duplicate(); --- End diff -- The `duplicate()` here looks redundant because it comes from the descriptor that already duplicates.
---