Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115141563 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java --- @@ -161,7 +162,93 @@ public abstract int hashCode(); - public boolean canRestoreFrom(TypeSerializer<?> other) { - return equals(other); + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & reconfiguring + // -------------------------------------------------------------------------------------------- + + /** + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is + * registered to (if any - this method is only relevant if this serializer is registered for serialization of + * managed state). + * + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its + * serialization format. When a new serializer is registered to serialize the same managed state that this + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer + * if any data migration needs to take place. + * + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization + * frameworks with built-in serialization compatibility, such as <a href=https://thrift.apache.org/>Thrift</a> or + * <a href=https://developers.google.com/protocol-buffers/>Protobuf</a>, is suitable for this usage pattern. By + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}, this informs Flink that when managed + * state serialized using this serializer is restored, there is no need to check for migration with the new + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the + * serialized state. + * + * @see TypeSerializerConfigSnapshot + * @see ForwardCompatibleSerializationFormatConfig + * + * @return snapshot of the serializer's current configuration. + */ + public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following: + * <ul> + * <li>{@link MigrationStrategy#noMigration()}: this signals Flink that this serializer is compatible, or + * has been reconfigured to be compatible, to continue reading old data, and that the + * serialization schema remains the same. No migration needs to be performed.</li> + * + * <li>{@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this signals Flink that + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or + * restored to read the old data, the provided fallback deserializer can be used.</li> + * + * <li>{@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li> + * </ul> + * + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always + * assume that the migration strategy is {@link MigrationStrategy#migrate()}. + * + * @see MigrationStrategy + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>This method is not part of the public user-facing API, and cannot be overriden. External operations + * providing a configuration snapshot of preceding serializer can only do so through this method. + * + * <p>This method always assumes that the migration strategy is {@link MigrationStrategy#noMigration()} if + * the provided configuration snapshot is the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}. + * Otherwise, the configuration snapshot is provided to the actual + * {@link #getMigrationStrategy(TypeSerializerConfigSnapshot)} (TypeSerializerConfigSnapshot)} implementation. + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + @Internal + public final MigrationStrategy<T> getMigrationStrategyFor(TypeSerializerConfigSnapshot configSnapshot) { + // reference equality is viable here, because the forward compatible + // marker config will always be explicitly restored with the singleton instance + if (configSnapshot != ForwardCompatibleSerializationFormatConfig.INSTANCE) { --- End diff -- I agree. I've removed the shortcut and have only a single public abstract method now.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---