[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999769#comment-15999769
 ] 

ASF GitHub Bot commented on FLINK-6425:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3834#discussion_r115139044
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> 
restoreStateHandles) throws Exception
        protected <N, S> ColumnFamilyHandle getColumnFamily(
                        StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException {
     
    -           Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, 
?>> stateInfo =
    +           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
                                kvStateInformation.get(descriptor.getName());
     
    -           RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredBackendStateMetaInfo<>(
    -                           descriptor.getType(),
    -                           descriptor.getName(),
    -                           namespaceSerializer,
    -                           descriptor.getSerializer());
    +           RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    +                   descriptor.getType(),
    +                   descriptor.getName(),
    +                   namespaceSerializer,
    +                   descriptor.getSerializer());
     
                if (stateInfo != null) {
    -                   if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
    +                   // TODO with eager registration in place, these checks 
should be moved to restore()
    +
    +                   RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
    +                           
restoredKvStateMetaInfos.get(descriptor.getName());
    +
    +                   Preconditions.checkState(
    +                           
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
    +                           "Incompatible state names. " +
    +                                   "Was [" + restoredMetaInfo.getName() + 
"], " +
    +                                   "registered with [" + 
newMetaInfo.getName() + "].");
    +
    +                   if 
(!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
    +                           && 
!restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
    +
    +                           Preconditions.checkState(
    +                                   
newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()),
    +                                   "Incompatible state types. " +
    +                                           "Was [" + 
restoredMetaInfo.getStateType() + "], " +
    +                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
    +                   }
    +
    +                   // check serializer migration strategies to determine 
if state migration is required
    +
    +                   boolean requireMigration = false;
    +
    +                   // only check migration strategy if there is a restored 
configuration snapshot;
    +                   // there wouldn't be one if we were restored from an 
older version checkpoint,
    +                   // in which case we can only simply assume that 
migration is not required
    +
    +                   if 
(restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
    +                           MigrationStrategy<N> namespaceMigrationStrategy 
= newMetaInfo.getNamespaceSerializer()
    +                                   
.getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
    +
    +                           TypeSerializer<N> finalOldNamespaceSerializer;
    +                           if 
(namespaceMigrationStrategy.requireMigration()) {
    +                                   requireMigration = true;
    +
    +                                   if 
(namespaceMigrationStrategy.getFallbackDeserializer() != null) {
    +                                           finalOldNamespaceSerializer = 
namespaceMigrationStrategy.getFallbackDeserializer();
    +                                   } else if 
(restoredMetaInfo.getNamespaceSerializer() != null
    +                                           && 
!(restoredMetaInfo.getNamespaceSerializer() instanceof 
MigrationNamespaceSerializerProxy)) {
    +                                           finalOldNamespaceSerializer = 
restoredMetaInfo.getNamespaceSerializer();
    +                                   } else {
    +                                           throw new RuntimeException(
    +                                                   "State migration 
required, but there is no available serializer capable of reading previous 
namespace.");
    +                                   }
    +                           }
    +                   }
    +
    +                   if (restoredMetaInfo.getStateSerializerConfigSnapshot() 
!= null) {
    --- End diff --
    
    This almost looks like duplicated code from the previous `if`. Maybe we can 
create a common helper method to do this and avoid (partial) code duplication.


> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6425
>                 URL: https://issues.apache.org/jira/browse/FLINK-6425
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to