[ 
https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999779#comment-15999779
 ] 

ASF GitHub Bot commented on FLINK-6425:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3834#discussion_r115139440
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 ---
    @@ -161,7 +162,93 @@
     
        public abstract int hashCode();
     
    -   public boolean canRestoreFrom(TypeSerializer<?> other) {
    -           return equals(other);
    +   // 
--------------------------------------------------------------------------------------------
    +   // Serializer configuration snapshotting & reconfiguring
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   /**
    +    * Create a snapshot of the serializer's current configuration to be 
stored along with the managed state it is
    +    * registered to (if any - this method is only relevant if this 
serializer is registered for serialization of
    +    * managed state).
    +    *
    +    * <p>The configuration snapshot should contain information about the 
serializer's parameter settings and its
    +    * serialization format. When a new serializer is registered to 
serialize the same managed state that this
    +    * serializer was registered to, the returned configuration snapshot 
can be used to check with the new serializer
    +    * if any data migration needs to take place.
    +    *
    +    * <p>Implementations can also return the singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}
    +    * configuration if they guarantee forwards compatibility. For example, 
implementations that use serialization
    +    * frameworks with built-in serialization compatibility, such as <a 
href=https://thrift.apache.org/>Thrift</a> or
    +    * <a 
href=https://developers.google.com/protocol-buffers/>Protobuf</a>, is suitable 
for this usage pattern. By
    +    * returning the {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}, this informs Flink that 
when managed
    +    * state serialized using this serializer is restored, there is no need 
to check for migration with the new
    +    * serializer for the same state. In other words, new serializers are 
always assumed to be fully compatible for the
    +    * serialized state.
    +    *
    +    * @see TypeSerializerConfigSnapshot
    +    * @see ForwardCompatibleSerializationFormatConfig
    +    *
    +    * @return snapshot of the serializer's current configuration.
    +    */
    +   public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
    +
    +   /**
    +    * Get the migration strategy to use this serializer based on the 
configuration snapshot of a preceding
    +    * serializer that was registered for serialization of the same managed 
state (if any - this method is only
    +    * relevant if this serializer is registered for serialization of 
managed state).
    +    *
    +    * <p>Implementations need to return the resolved migration strategy. 
The strategy can be one of the following:
    +    * <ul>
    +    *     <li>{@link MigrationStrategy#noMigration()}: this signals Flink 
that this serializer is compatible, or
    +    *     has been reconfigured to be compatible, to continue reading old 
data, and that the
    +    *     serialization schema remains the same. No migration needs to be 
performed.</li>
    +    *
    +    *     <li>{@link 
MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this 
signals Flink that
    +    *     migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
    +    *     compatible, for old data. Furthermore, in the case that the 
preceding serializer cannot be found or
    +    *     restored to read the old data, the provided fallback 
deserializer can be used.</li>
    +    *
    +    *     <li>{@link MigrationStrategy#migrate()}: this signals Flink that 
migration needs to be performed, because
    +    *     this serializer is not compatible, or cannot be reconfigured to 
be compatible, for old data.</li>
    +    * </ul>
    +    *
    +    * <p>This method is guaranteed to only be invoked if the preceding 
serializer's configuration snapshot is not the
    +    * singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such 
cases, Flink always
    +    * assume that the migration strategy is {@link 
MigrationStrategy#migrate()}.
    +    *
    +    * @see MigrationStrategy
    +    *
    +    * @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
    +    *
    +    * @return the result of the reconfiguration.
    +    */
    +   protected abstract MigrationStrategy<T> 
getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);
    +
    +   /**
    +    * Get the migration strategy to use this serializer based on the 
configuration snapshot of a preceding
    +    * serializer that was registered for serialization of the same managed 
state (if any - this method is only
    +    * relevant if this serializer is registered for serialization of 
managed state).
    +    *
    +    * <p>This method is not part of the public user-facing API, and cannot 
be overriden. External operations
    +    * providing a configuration snapshot of preceding serializer can only 
do so through this method.
    +    *
    +    * <p>This method always assumes that the migration strategy is {@link 
MigrationStrategy#noMigration()} if
    +    * the provided configuration snapshot is the singleton {@link 
ForwardCompatibleSerializationFormatConfig#INSTANCE}.
    +    * Otherwise, the configuration snapshot is provided to the actual
    +    * {@link #getMigrationStrategy(TypeSerializerConfigSnapshot)} 
(TypeSerializerConfigSnapshot)} implementation.
    +    *
    +    * @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
    +    *
    +    * @return the result of the reconfiguration.
    +    */
    +   @Internal
    +   public final MigrationStrategy<T> 
getMigrationStrategyFor(TypeSerializerConfigSnapshot configSnapshot) {
    +           // reference equality is viable here, because the forward 
compatible
    +           // marker config will always be explicitly restored with the 
singleton instance
    +           if (configSnapshot != 
ForwardCompatibleSerializationFormatConfig.INSTANCE) {
    --- End diff --
    
    I see the intention, but I think it is not a good idea to have this 
`final`default implementation around a 
`ForwardCompatibleSerializationFormatConfig`.
    
    Imagine a case where the old version of a serializer is `ForwardCompatible` 
and the new serializer which replaces it is not. Then this will always bypass 
the check, even if it shouldn't.


> Integrate serializer reconfiguration into state restore flow to activate 
> serializer upgrades
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6425
>                 URL: https://issues.apache.org/jira/browse/FLINK-6425
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> With FLINK-6191, {{TypeSerializer}} will be reconfigurable.
> From the state backends' point of view, serializer reconfiguration doubles as 
> a mechanism to determine how serializer upgrades should be handled.
> The general idea is that state checkpoints should contain the following as 
> the state's metainfo:
> - the previous serializer
> - snapshot of the previous serializer's configuration
> The upgrade flow is as follows:
> 1. On restore, try to deserialize the previous old serializer. 
> Deserialization may fail if a) the serializer no longer exists in classpath, 
> or b) the serializer class is not longer valid (i.e., implementation changed 
> and resulted in different serialVersionUID). In this case, use a dummy 
> serializer as a placeholder. This dummy serializer is currently the 
> {{ClassNotFoundProxySerializer}} in the code.
> 2. Deserialize the configuration snapshot of the previous old serializer. The 
> configuration snapshot must be successfully deserialized, otherwise the state 
> restore fails.
> 3. When we get the new registered serializer for the state (could be a 
> completely new serializer, the same serializer with different 
> implementations, or the exact same serializer untouched; either way they are 
> seen as a new serializer), we use the configuration snapshot of the old 
> serializer to reconfigure the new serializer.
> This completes the upgrade of the old serializer.  However, depending on the 
> result of the upgrade, state conversion needs to take place (for now, if 
> state conversion is required, we just fail the job as this functionality 
> isn't available yet). The results could be:
> - Compatible: restore success + serializer upgraded.
> - Compatible, but serialization schema changed: serializer upgraded but 
> requires state conversion, without the requirement that the old serializer 
> needs to be present.
> - Incompatible: serializer upgraded requires state conversion, but requires 
> the old serializer to be present (i.e., can not be the dummy 
> {{ClassNotFoundProxySerializer}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to