[ https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999824#comment-15999824 ]
ASF GitHub Bot commented on FLINK-6425: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115141548 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception protected <N, S> ColumnFamilyHandle getColumnFamily( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException { - Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); - RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - namespaceSerializer, - descriptor.getSerializer()); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); if (stateInfo != null) { - if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + newMetaInfo.getName().equals(restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "], " + + "registered with [" + newMetaInfo.getName() + "]."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check serializer migration strategies to determine if state migration is required + + boolean requireMigration = false; + + // only check migration strategy if there is a restored configuration snapshot; + // there wouldn't be one if we were restored from an older version checkpoint, + // in which case we can only simply assume that migration is not required + + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) { + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot()); + + TypeSerializer<N> finalOldNamespaceSerializer; + if (namespaceMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + } + } + + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) { + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot()); + + TypeSerializer<S> finalOldStateSerializer; + if (stateMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (stateMigrationStrategy.getFallbackDeserializer() != null) { --- End diff -- I'll change this to the suggested new flow :) > Integrate serializer reconfiguration into state restore flow to activate > serializer upgrades > -------------------------------------------------------------------------------------------- > > Key: FLINK-6425 > URL: https://issues.apache.org/jira/browse/FLINK-6425 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > > With FLINK-6191, {{TypeSerializer}} will be reconfigurable. > From the state backends' point of view, serializer reconfiguration doubles as > a mechanism to determine how serializer upgrades should be handled. > The general idea is that state checkpoints should contain the following as > the state's metainfo: > - the previous serializer > - snapshot of the previous serializer's configuration > The upgrade flow is as follows: > 1. On restore, try to deserialize the previous old serializer. > Deserialization may fail if a) the serializer no longer exists in classpath, > or b) the serializer class is not longer valid (i.e., implementation changed > and resulted in different serialVersionUID). In this case, use a dummy > serializer as a placeholder. This dummy serializer is currently the > {{ClassNotFoundProxySerializer}} in the code. > 2. Deserialize the configuration snapshot of the previous old serializer. The > configuration snapshot must be successfully deserialized, otherwise the state > restore fails. > 3. When we get the new registered serializer for the state (could be a > completely new serializer, the same serializer with different > implementations, or the exact same serializer untouched; either way they are > seen as a new serializer), we use the configuration snapshot of the old > serializer to reconfigure the new serializer. > This completes the upgrade of the old serializer. However, depending on the > result of the upgrade, state conversion needs to take place (for now, if > state conversion is required, we just fail the job as this functionality > isn't available yet). The results could be: > - Compatible: restore success + serializer upgraded. > - Compatible, but serialization schema changed: serializer upgraded but > requires state conversion, without the requirement that the old serializer > needs to be present. > - Incompatible: serializer upgraded requires state conversion, but requires > the old serializer to be present (i.e., can not be the dummy > {{ClassNotFoundProxySerializer}}). -- This message was sent by Atlassian JIRA (v6.3.15#6346)