[ https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999776#comment-15999776 ]
ASF GitHub Bot commented on FLINK-6425: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139339 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception protected <N, S> ColumnFamilyHandle getColumnFamily( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException { - Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); - RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - namespaceSerializer, - descriptor.getSerializer()); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); if (stateInfo != null) { - if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + newMetaInfo.getName().equals(restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "], " + + "registered with [" + newMetaInfo.getName() + "]."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check serializer migration strategies to determine if state migration is required + + boolean requireMigration = false; + + // only check migration strategy if there is a restored configuration snapshot; + // there wouldn't be one if we were restored from an older version checkpoint, + // in which case we can only simply assume that migration is not required + + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) { + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot()); + + TypeSerializer<N> finalOldNamespaceSerializer; + if (namespaceMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + } + } + + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) { + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot()); + + TypeSerializer<S> finalOldStateSerializer; + if (stateMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (stateMigrationStrategy.getFallbackDeserializer() != null) { --- End diff -- This whole `if` is interesting: basically you give the `FallbackDeserializer` priority over the actual old serializer that we could get via Java serialization. Originally, I thought the intended flow is: check compatibility between by confronting the user provided serializer with the stored the config. In case they we need to convert, first try to load the former serializer through Java serialization (because this is a safe bet that this class can read the old state if we succeed). If Java deserialization fails, we use the `FallbackSerializer` provided by the new serializer (this should also be a safe bet, except if the implementation is wrong, so overall slightly less save). Now here is there question: I think in the serializer you combine the compatibility check and potential creation of the `FallbackSerializer` in a single method, because both methods would partially do similar and duplicated work. The downside now is, given the indented flow, we only need to create and use a `FallbackSerializer` if we cannot load the old serializer through Java deserialization. I can see that this flow could also work, or we can still use the flow that prioritizes Java serialization and potentially creates an unused `FallbackSerializer`. What do you think and is there some point I did not consider that lead to this choice? > Integrate serializer reconfiguration into state restore flow to activate > serializer upgrades > -------------------------------------------------------------------------------------------- > > Key: FLINK-6425 > URL: https://issues.apache.org/jira/browse/FLINK-6425 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > > With FLINK-6191, {{TypeSerializer}} will be reconfigurable. > From the state backends' point of view, serializer reconfiguration doubles as > a mechanism to determine how serializer upgrades should be handled. > The general idea is that state checkpoints should contain the following as > the state's metainfo: > - the previous serializer > - snapshot of the previous serializer's configuration > The upgrade flow is as follows: > 1. On restore, try to deserialize the previous old serializer. > Deserialization may fail if a) the serializer no longer exists in classpath, > or b) the serializer class is not longer valid (i.e., implementation changed > and resulted in different serialVersionUID). In this case, use a dummy > serializer as a placeholder. This dummy serializer is currently the > {{ClassNotFoundProxySerializer}} in the code. > 2. Deserialize the configuration snapshot of the previous old serializer. The > configuration snapshot must be successfully deserialized, otherwise the state > restore fails. > 3. When we get the new registered serializer for the state (could be a > completely new serializer, the same serializer with different > implementations, or the exact same serializer untouched; either way they are > seen as a new serializer), we use the configuration snapshot of the old > serializer to reconfigure the new serializer. > This completes the upgrade of the old serializer. However, depending on the > result of the upgrade, state conversion needs to take place (for now, if > state conversion is required, we just fail the job as this functionality > isn't available yet). The results could be: > - Compatible: restore success + serializer upgraded. > - Compatible, but serialization schema changed: serializer upgraded but > requires state conversion, without the requirement that the old serializer > needs to be present. > - Incompatible: serializer upgraded requires state conversion, but requires > the old serializer to be present (i.e., can not be the dummy > {{ClassNotFoundProxySerializer}}). -- This message was sent by Atlassian JIRA (v6.3.15#6346)