Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6325#discussion_r202293217 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private <N, S extends State, SV> RegisteredKeyedBackendStateMetaInfo<N, SV> migrateStateIfNecessary( + StateDescriptor<S, SV> stateDesc, + TypeSerializer<N> namespaceSerializer, + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo) throws Exception { + + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV> restoredMetaInfoSnapshot = + (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, SV>) restoredKvStateMetaInfos.get( + stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc); + + TypeSerializer<SV> stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyedBackendStateMetaInfo<N, SV> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer); + + // check compatibility results to determine if state migration is required + TypeSerializerSchemaCompatibility<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(), + namespaceSerializer); + + TypeSerializerSchemaCompatibility<SV> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(), + stateSerializer); + + if (namespaceCompatibility.isIncompatible()) { + throw new UnsupportedOperationException( + "Changing the namespace TypeSerializer in an incompatible way is currently not supported."); + } + + if (stateCompatibility.isIncompatible()) { + if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) { + throw new UnsupportedOperationException( + "Changing the TypeSerializers of a MapState in an incompatible way is currently not supported."); + } + + LOG.info( + "Performing state migration for state {} because the state serializer changed in an incompatible way.", + stateDesc); + + // we need to get an actual state instance because migration is different + // for different state types. For example, ListState needs to deal with + // individual elements + StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); + if (stateFactory == null) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), this.getClass()); + throw new FlinkRuntimeException(message); + } + + State state = stateFactory.createState( + stateDesc, + Tuple2.of(stateInfo.f0, newMetaInfo), + RocksDBKeyedStateBackend.this); + + if (!(state instanceof AbstractRocksDBState)) { + throw new FlinkRuntimeException( + "State should be an AbstractRocksDBState but is " + state); + } + + AbstractRocksDBState rocksDBState = (AbstractRocksDBState<?, N, ?, S>) state; + + Snapshot rocksDBSnapshot = null; + RocksIteratorWrapper iterator = null; + + try (ReadOptions readOptions = new ReadOptions();) { --- End diff -- I would suggest to try this: ``` Snapshot rocksDBSnapshot = db.getSnapshot(); try ( ReadOptions readOptions = new ReadOptions().setSnapshot(rocksDBSnapshot); RocksIteratorWrapper iterator = getRocksIterator(db, stateInfo.f0, readOptions)) { iterator.seekToFirst(); (...) } finally { db.releaseSnapshot(rocksDBSnapshot); rocksDBSnapshot.close(); } ```
---