[ https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627517#comment-16627517 ]
ASF GitHub Bot commented on FLINK-9377: --------------------------------------- dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints URL: https://github.com/apache/flink/pull/6711#discussion_r220227910 ########## File path: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java ########## @@ -21,35 +21,131 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; +import java.io.IOException; + /** * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration. - * The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the - * serializer is registered to. + * The configuration snapshot of a serializer is persisted within checkpoints + * as a single source of meta information about the schema of serialized data in the checkpoint. + * This serves three purposes: + * + * <ul> + * <li><strong>Capturing serializer parameters and schema:</strong> a serializer's configuration snapshot + * represents information about the parameters, state, and schema of a serializer. + * This is explained in more detail below.</li> * - * <p>The persisted configuration may later on be used by new serializers to ensure serialization compatibility - * for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot - * should encode sufficient information about: + * <li><strong>Compatibility checks for new serializers:</strong> when new serializers are available, + * they need to be checked whether or not they are compatible to read the data written by the previous serializer. + * This is performed by providing the new serializer to the correspondibng serializer configuration + * snapshots in checkpoints.</li> + * + * <li><strong>Factory for a read serializer when schema conversion is required:<strong> in the case that new + * serializers are not compatible to read previous data, a schema conversion process executed across all data + * is required before the new serializer can be continued to be used. This conversion process requires a compatible + * read serializer to restore serialized bytes as objects, and then written back again using the new serializer. + * In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read + * serializer of the conversion process.</li> + * </ul> + * + * <h2>Serializer Configuration and Schema</h2> + * + * <p>Since serializer configuration snapshots needs to be used to ensure serialization compatibility + * for the same managed state as well as serving as a factory for compatible read serializers, the configuration + * snapshot should encode sufficient information about: * * <ul> * <li><strong>Parameter settings of the serializer:</strong> parameters of the serializer include settings * required to setup the serializer, or the state of the serializer if it is stateful. If the serializer * has nested serializers, then the configuration snapshot should also contain the parameters of the nested * serializers.</li> * - * <li><strong>Serialization schema of the serializer:</strong> the data format used by the serializer.</li> + * <li><strong>Serialization schema of the serializer:</strong> the binary format used by the serializer, or + * in other words, the schema of data written by the serializer.</li> * </ul> * * <p>NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to * deserialize the configuration snapshot from its binary form. + * + * @param <T> The data type that the originating serializer of this configuration serializes. */ @PublicEvolving -public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { +public abstract class TypeSerializerConfigSnapshot<T> extends VersionedIOReadableWritable { /** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */ private ClassLoader userCodeClassLoader; + /** + * The originating serializer of this configuration snapshot. + * + * TODO to allow for incrementally adapting the implementation of serializer config snapshot subclasses, + * TODO we currently have a base implementation for the {@link #restoreSerializer()} + * TODO method which simply returns this serializer instance. The serializer is written + * TODO and read using Java serialization as part of reading / writing the config snapshot + */ + private TypeSerializer<T> serializer; + + /** + * Creates a serializer using this configuration, that is capable of reading data + * written by the serializer described by this configuration. + * + * @return the restored serializer. + */ + public TypeSerializer<T> restoreSerializer() { + // TODO this implementation is only a placeholder; the intention is to have no default implementation + return serializer; + } + + /** + * Set the originating serializer of this configuration snapshot. + * + * TODO this method is a temporary workaround to inject the serializer instance to + * TODO be returned by the restoreSerializer() method. + */ + @Internal + public final void setSerializer(TypeSerializer<T> serializer) { + this.serializer = Preconditions.checkNotNull(serializer); + } + + /** + * Checks whether a new serializer is compatible to read data written be the originating serializer of this + * config snapshot; i.e. whether or not a new serializer is compatible with the previous serializer. + * + * @param newSerializer the new serializer to check against for schema compatibility. + * + * @return the resolve schema compatibility result. + */ + public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<?> newSerializer) { + @SuppressWarnings("unchecked") + TypeSerializer<T> castedSerializer = ((TypeSerializer<T>) newSerializer); + + if (castedSerializer.ensureCompatibility(this).isRequiresMigration()) { + return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); + } else { + return TypeSerializerSchemaCompatibility.compatibleAfterReconfiguration(castedSerializer); + } + } + + @Override + public void write(DataOutputView out) throws IOException { + // bump the version; we use this to know that there is a serializer to read as part of the config + out.writeInt(getVersion() + 1); Review comment: Does it mean that we will have a out-of-order versioning? Assume old(pre-1.6) have version 1, this will write version 3 (2+1), than after this whole story with state migration is done we will have to go directly to 4. Is my understanding correct? If so I would be against merging it in current shape, but merge after the whole epic is completed, or would think of some other workaround ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove writing serializers as part of the checkpoint meta information > --------------------------------------------------------------------- > > Key: FLINK-9377 > URL: https://issues.apache.org/jira/browse/FLINK-9377 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > When writing meta information of a state in savepoints, we currently write > both the state serializer as well as the state serializer's configuration > snapshot. > Writing both is actually redundant, as most of the time they have identical > information. > Moreover, the fact that we use Java serialization to write the serializer > and rely on it to be re-readable on the restore run, already poses problems > for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) > to perform even a compatible upgrade. > The proposal here is to leave only the config snapshot as meta information, > and use that as the single source of truth of information about the schema of > serialized state. > The config snapshot should be treated as a factory (or provided to a > factory) to re-create serializers capable of reading old, serialized state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)