[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999776#comment-15999776
 ] 

ASF GitHub Bot commented on FLINK-6425:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3834#discussion_r115139339
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> 
restoreStateHandles) throws Exception
        protected <N, S> ColumnFamilyHandle getColumnFamily(
                        StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException {
     
    -           Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, 
?>> stateInfo =
    +           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
                                kvStateInformation.get(descriptor.getName());
     
    -           RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredBackendStateMetaInfo<>(
    -                           descriptor.getType(),
    -                           descriptor.getName(),
    -                           namespaceSerializer,
    -                           descriptor.getSerializer());
    +           RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   descriptor.getType(),
    +                   descriptor.getName(),
    +                   namespaceSerializer,
    +                   descriptor.getSerializer());
     
                if (stateInfo != null) {
    -                   if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
    +                   // TODO with eager registration in place, these checks 
should be moved to restore()
    +
    +                   RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
    +                           
restoredKvStateMetaInfos.get(descriptor.getName());
    +
    +                   Preconditions.checkState(
    +                           
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
    +                           "Incompatible state names. " +
    +                                   "Was [" + restoredMetaInfo.getName() + 
"], " +
    +                                   "registered with [" + 
newMetaInfo.getName() + "].");
    +
    +                   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
    +                           && 
!restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
    +
    +                           Preconditions.checkState(
    +                                   
newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
    +                                   "Incompatible state types. " +
    +                                           "Was [" + 
restoredMetaInfo.getStateType() + "], " +
    +                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
    +                   }
    +
    +                   // check serializer migration strategies to determine 
if state migration is required
    +
    +                   boolean requireMigration = false;
    +
    +                   // only check migration strategy if there is a restored 
configuration snapshot;
    +                   // there wouldn't be one if we were restored from an 
older version checkpoint,
    +                   // in which case we can only simply assume that 
migration is not required
    +
    +                   if 
(restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
    +                           MigrationStrategy<N> namespaceMigrationStrategy 
= newMetaInfo.getNamespaceSerializer()
    +                                   
.getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
    +
    +                           TypeSerializer<N> finalOldNamespaceSerializer;
    +                           if 
(namespaceMigrationStrategy.requireMigration()) {
    +                                   requireMigration = true;
    +
    +                                   if 
(namespaceMigrationStrategy.getFallbackDeserializer() != null) {
    +                                           finalOldNamespaceSerializer = 
namespaceMigrationStrategy.getFallbackDeserializer();
    +                                   } else if 
(restoredMetaInfo.getNamespaceSerializer() != null
    +                                           && 
!(restoredMetaInfo.getNamespaceSerializer() instanceof 
MigrationNamespaceSerializerProxy)) {
    +                                           finalOldNamespaceSerializer = 
restoredMetaInfo.getNamespaceSerializer();
    +                                   } else {
    +                                           throw new RuntimeException(
    +                                                   "State migration 
required, but there is no available serializer capable of reading previous 
namespace.");
    +                                   }
    +                           }
    +                   }
    +
    +                   if (restoredMetaInfo.getStateSerializerConfigSnapshot() 
!= null) {
    +                           MigrationStrategy<S> stateMigrationStrategy = 
newMetaInfo.getStateSerializer()
    +                                   
.getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot());
    +
    +                           TypeSerializer<S> finalOldStateSerializer;
    +                           if (stateMigrationStrategy.requireMigration()) {
    +                                   requireMigration = true;
    +
    +                                   if 
(stateMigrationStrategy.getFallbackDeserializer() != null) {
    --- End diff --
    
    This whole `if` is interesting: basically you give the 
`FallbackDeserializer` priority over the actual old serializer that we could 
get via Java serialization.
    Originally, I thought the intended flow is: check compatibility between by 
confronting the user provided serializer with the stored the config. In case 
they we need to convert, first try to load the former serializer through Java 
serialization (because this is a safe bet that this class can read the old 
state if we succeed). If Java deserialization fails, we use the 
`FallbackSerializer` provided by the new serializer (this should also be a safe 
bet, except if the implementation is wrong, so overall slightly less save).
    
    Now here is there question: I think in the serializer you combine the 
compatibility check and potential creation of the `FallbackSerializer` in a 
single method, because both methods would partially do similar and duplicated  
work.
    The downside now is, given the indented flow, we only need to create and 
use a `FallbackSerializer` if we cannot load the old serializer through Java 
deserialization.
    
    I can see that this flow could also work, or we can still use the flow that 
prioritizes Java serialization and potentially creates an unused 
`FallbackSerializer`.
    
    What do you think and is there some point I did not consider that lead to 
this choice?


> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6425
>                 URL: https://issues.apache.org/jira/browse/FLINK-6425
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to