[ https://issues.apache.org/jira/browse/FLINK-6425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15999779#comment-15999779 ]
ASF GitHub Bot commented on FLINK-6425: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139440 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java --- @@ -161,7 +162,93 @@ public abstract int hashCode(); - public boolean canRestoreFrom(TypeSerializer<?> other) { - return equals(other); + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & reconfiguring + // -------------------------------------------------------------------------------------------- + + /** + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is + * registered to (if any - this method is only relevant if this serializer is registered for serialization of + * managed state). + * + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its + * serialization format. When a new serializer is registered to serialize the same managed state that this + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer + * if any data migration needs to take place. + * + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization + * frameworks with built-in serialization compatibility, such as <a href=https://thrift.apache.org/>Thrift</a> or + * <a href=https://developers.google.com/protocol-buffers/>Protobuf</a>, is suitable for this usage pattern. By + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}, this informs Flink that when managed + * state serialized using this serializer is restored, there is no need to check for migration with the new + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the + * serialized state. + * + * @see TypeSerializerConfigSnapshot + * @see ForwardCompatibleSerializationFormatConfig + * + * @return snapshot of the serializer's current configuration. + */ + public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following: + * <ul> + * <li>{@link MigrationStrategy#noMigration()}: this signals Flink that this serializer is compatible, or + * has been reconfigured to be compatible, to continue reading old data, and that the + * serialization schema remains the same. No migration needs to be performed.</li> + * + * <li>{@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this signals Flink that + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or + * restored to read the old data, the provided fallback deserializer can be used.</li> + * + * <li>{@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li> + * </ul> + * + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always + * assume that the migration strategy is {@link MigrationStrategy#migrate()}. + * + * @see MigrationStrategy + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>This method is not part of the public user-facing API, and cannot be overriden. External operations + * providing a configuration snapshot of preceding serializer can only do so through this method. + * + * <p>This method always assumes that the migration strategy is {@link MigrationStrategy#noMigration()} if + * the provided configuration snapshot is the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}. + * Otherwise, the configuration snapshot is provided to the actual + * {@link #getMigrationStrategy(TypeSerializerConfigSnapshot)} (TypeSerializerConfigSnapshot)} implementation. + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + @Internal + public final MigrationStrategy<T> getMigrationStrategyFor(TypeSerializerConfigSnapshot configSnapshot) { + // reference equality is viable here, because the forward compatible + // marker config will always be explicitly restored with the singleton instance + if (configSnapshot != ForwardCompatibleSerializationFormatConfig.INSTANCE) { --- End diff -- I see the intention, but I think it is not a good idea to have this `final`default implementation around a `ForwardCompatibleSerializationFormatConfig`. Imagine a case where the old version of a serializer is `ForwardCompatible` and the new serializer which replaces it is not. Then this will always bypass the check, even if it shouldn't. > Integrate serializer reconfiguration into state restore flow to activate > serializer upgrades > -------------------------------------------------------------------------------------------- > > Key: FLINK-6425 > URL: https://issues.apache.org/jira/browse/FLINK-6425 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > > With FLINK-6191, {{TypeSerializer}} will be reconfigurable. > From the state backends' point of view, serializer reconfiguration doubles as > a mechanism to determine how serializer upgrades should be handled. > The general idea is that state checkpoints should contain the following as > the state's metainfo: > - the previous serializer > - snapshot of the previous serializer's configuration > The upgrade flow is as follows: > 1. On restore, try to deserialize the previous old serializer. > Deserialization may fail if a) the serializer no longer exists in classpath, > or b) the serializer class is not longer valid (i.e., implementation changed > and resulted in different serialVersionUID). In this case, use a dummy > serializer as a placeholder. This dummy serializer is currently the > {{ClassNotFoundProxySerializer}} in the code. > 2. Deserialize the configuration snapshot of the previous old serializer. The > configuration snapshot must be successfully deserialized, otherwise the state > restore fails. > 3. When we get the new registered serializer for the state (could be a > completely new serializer, the same serializer with different > implementations, or the exact same serializer untouched; either way they are > seen as a new serializer), we use the configuration snapshot of the old > serializer to reconfigure the new serializer. > This completes the upgrade of the old serializer. However, depending on the > result of the upgrade, state conversion needs to take place (for now, if > state conversion is required, we just fail the job as this functionality > isn't available yet). The results could be: > - Compatible: restore success + serializer upgraded. > - Compatible, but serialization schema changed: serializer upgraded but > requires state conversion, without the requirement that the old serializer > needs to be present. > - Incompatible: serializer upgraded requires state conversion, but requires > the old serializer to be present (i.e., can not be the dummy > {{ClassNotFoundProxySerializer}}). -- This message was sent by Atlassian JIRA (v6.3.15#6346)