[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999777#comment-15999777
 ] 

ASF GitHub Bot commented on FLINK-6425:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3834#discussion_r115139379
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/MigrationStrategy.java
 ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.common.typeutils;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +
    +/**
    + * A {@code MigrationStrategy} contains information about how to perform 
migration of data written
    + * by an older serializer so that new serializers can continue to work on 
them.
    + *
    + * @param <T> the type of the data being migrated.
    + */
    +@PublicEvolving
    +public final class MigrationStrategy<T> {
    +
    +   /** Whether or not migration is required. */
    +   private final boolean requiresStateMigration;
    +
    +   /**
    +    * The fallback deserializer to use, in the case the preceding 
serializer cannot be found.
    +    *
    +    * <p>This is only relevant if migration is required.
    +    */
    +   private final TypeSerializer<T> fallbackDeserializer;
    +
    +   /**
    +    * Returns a strategy that simply signals that no migration needs to be 
performed.
    +    *
    +    * @return a strategy that does not perform migration
    +    */
    +   public static <T> MigrationStrategy<T> noMigration() {
    +           return new MigrationStrategy<>(false, null);
    +   }
    +
    +   /**
    +    * Returns a strategy that signals migration to be performed, and in 
the case that the
    +    * preceding serializer cannot be found, a provided fallback 
deserializer can be
    +    * used.
    +    *
    +    * @param fallbackDeserializer a fallback deserializer that can be used 
to read old data for the migration
    +    *                             in the case that the preceding 
serializer cannot be found.
    +    *
    +    * @return a strategy that performs migration with a fallback 
deserializer to read old data.
    +    */
    +   public static <T> MigrationStrategy<T> 
migrateWithFallbackDeserializer(TypeSerializer<T> fallbackDeserializer) {
    +           return new MigrationStrategy<>(true, fallbackDeserializer);
    +   }
    +
    +   /**
    +    * Returns a strategy that signals migration to be performed, without a 
fallback deserializer.
    +    * If the preceding serializer cannot be found, the migration fails 
because the old data cannot be read.
    +    *
    +    * @return a strategy that performs migration, without a fallback 
deserializer.
    +    */
    +   public static <T> MigrationStrategy<T> migrate() {
    --- End diff --
    
    Maybe this should be named `migrationRequired`?


> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6425
>                 URL: https://issues.apache.org/jira/browse/FLINK-6425
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to