tzulitai closed pull request #6930: [FLINK-10679] Remove deprecated 
CompatibilityResult and related classes from framework code
URL: https://github.com/apache/flink/pull/6930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
index 49d03db9830..4b72902264f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleSerializerSnapshot.java
@@ -24,6 +24,7 @@
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
+
 import java.io.IOException;
 
 /**
@@ -48,13 +49,13 @@ public 
BackwardsCompatibleSerializerSnapshot(TypeSerializer<T> serializerInstanc
        }
 
        @Override
-       public void write(DataOutputView out) throws IOException {
+       public void writeSnapshot(DataOutputView out) throws IOException {
                throw new UnsupportedOperationException(
                        "This is a dummy config snapshot used only for 
backwards compatibility.");
        }
 
        @Override
-       public void read(int version, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
+       public void readSnapshot(int version, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
                throw new UnsupportedOperationException(
                        "This is a dummy config snapshot used only for 
backwards compatibility.");
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
index a188a4d23db..ae19d038c39 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityUtil.java
@@ -68,7 +68,9 @@
                if (precedingSerializerConfigSnapshot != null
                        && !(precedingSerializerConfigSnapshot instanceof 
BackwardsCompatibleSerializerSnapshot)) {
 
-                       CompatibilityResult<T> initialResult = 
newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
+                       CompatibilityResult<T> initialResult = 
resolveCompatibilityResult(
+                                       (TypeSerializerSnapshot<T>) 
precedingSerializerConfigSnapshot,
+                                       newSerializer);
 
                        if (!initialResult.isRequiresMigration()) {
                                return initialResult;
@@ -89,4 +91,19 @@
                }
        }
 
+       public static <T> CompatibilityResult<T> resolveCompatibilityResult(
+                       TypeSerializerSnapshot<T> 
precedingSerializerConfigSnapshot,
+                       TypeSerializer<T> newSerializer) {
+
+               TypeSerializerSchemaCompatibility<T, TypeSerializer<T>> 
compatibility =
+                               
precedingSerializerConfigSnapshot.resolveSchemaCompatibility(newSerializer);
+
+               // everything except "compatible" maps to "requires migration".
+               // at the entry point of the new-to-old-bridge (in the 
TypeSerializerConfigSnapshot), we
+               // interpret "requiresMigration" as 'incompatible'. That is a 
precaution because
+               // serializers could previously not specify the 'incompatible' 
case.
+               return compatibility.isCompatibleAsIs() ?
+                               CompatibilityResult.compatible() :
+                               CompatibilityResult.requiresMigration();
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 7a1675eafba..ddb0b87e525 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -29,11 +28,37 @@
 /**
  * This interface describes the methods that are required for a data type to 
be handled by the Flink
  * runtime. Specifically, this interface contains the serialization and 
copying methods.
- * <p>
- * The methods in this class are assumed to be stateless, such that it is 
effectively thread safe. Stateful
+ *
+ * <p>The methods in this class are assumed to be stateless, such that it is 
effectively thread safe. Stateful
  * implementations of the methods may lead to unpredictable side effects and 
will compromise both stability and
  * correctness of the program.
- * 
+ *
+ * <p><b>Upgrading TypeSerializers to the new TypeSerializerSnapshot model</b>
+ *
+ * <p>This section is relevant if you implemented a TypeSerializer in Flink 
versions up to 1.6 and want
+ * to adapt that implementation to the new interfaces that support proper 
state schema evolution. Please
+ * follow these steps:
+ *
+ * <ul>
+ *     <li>Change the type serializer's config snapshot to implement {@link 
TypeSerializerSnapshot}, rather
+ *     than extending {@code TypeSerializerConfigSnapshot} (as previously).
+ *     <li>Move the compatibility check from the {@link 
TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)}
+ *     method to the {@link 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)} method.
+ * </ul>
+ *
+ * <p><b>Maintaining Backwards Compatibility</b>
+ *
+ * <p>If you want your serializer to be able to restore checkpoints from Flink 
1.6 and before, add the steps
+ * below in addition to the steps above.
+ *
+ * <ul>
+ *     <li>Retain the old serializer snapshot class (extending {@code 
TypeSerializerConfigSnapshot}) under
+ *     the same name and give the updated serializer snapshot class (the one 
extending {@code TypeSerializerSnapshot})
+ *     a new name.
+ *     <li>Keep the {@link 
TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)} on the 
TypeSerializer
+ *     as well.
+ * </ul>
+ *
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
@@ -163,85 +188,55 @@
        public abstract int hashCode();
 
        // 
--------------------------------------------------------------------------------------------
-       // Serializer configuration snapshotting & compatibility
+       // Serializer configuration snapshot for checkpoints/savepoints
        // 
--------------------------------------------------------------------------------------------
 
        /**
-        * Create a snapshot of the serializer's current configuration to be 
stored along with the managed state it is
-        * registered to (if any - this method is only relevant if this 
serializer is registered for serialization of
-        * managed state).
+        * Snapshots the configuration of this TypeSerializer. This method is 
only relevant if the serializer is
+        * used to state stored in checkpoints/savepoints.
         *
-        * <p>The configuration snapshot should contain information about the 
serializer's parameter settings and its
-        * serialization format. When a new serializer is registered to 
serialize the same managed state that this
-        * serializer was registered to, the returned configuration snapshot 
can be used to ensure compatibility
-        * of the new serializer and determine if state migration is required.
+        * <p>The snapshot of the TypeSerializer is supposed to contain all 
information that affects the serialization
+        * format of the serializer. The snapshot serves two purposes: First, 
to reproduce the serializer when the
+        * checkpoint/savepoint is restored, and second, to check whether the 
serialization format is compatible
+        * with the serializer used in the restored program.
         *
-        * @see TypeSerializerSnapshot
+        * <p><b>IMPORTANT:</b> TypeSerializerSnapshots changed after Flink 
1.6. Serializers implemented against
+        * Flink versions up to 1.6 should still work, but adjust to new model 
to enable state evolution and be
+        * future-proof.
+        * See the class-level comments, section "Upgrading TypeSerializers to 
the new TypeSerializerSnapshot model"
+        * for details.
+        *
+        * @see 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)
         *
         * @return snapshot of the serializer's current configuration (cannot 
be {@code null}).
         */
        public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Deprecated methods for backwards compatibility
+       // 
--------------------------------------------------------------------------------------------
+
        /**
-        * Ensure compatibility of this serializer with a preceding serializer 
that was registered for serialization of
-        * the same managed state (if any - this method is only relevant if 
this serializer is registered for
-        * serialization of managed state).
-        *
-        * <p>The compatibility check in this method should be performed by 
inspecting the preceding serializer's configuration
-        * snapshot. The method may reconfigure the serializer (if required and 
possible) so that it may be compatible,
-        * or provide a signaling result that informs Flink that state 
migration is necessary before continuing to use
-        * this serializer.
-        *
-        * <p>The result can be one of the following:
-        * <ul>
-        *     <li>{@link CompatibilityResult#compatible()}: this signals Flink 
that this serializer is compatible, or
-        *     has been reconfigured to be compatible, to continue reading 
previous data, and that the
-        *     serialization schema remains the same. No migration needs to be 
performed.</li>
-        *
-        *     <li>{@link 
CompatibilityResult#requiresMigration(TypeDeserializer)}: this signals Flink 
that
-        *     migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
-        *     compatible, for previous data. Furthermore, in the case that the 
preceding serializer cannot be found or
-        *     restored to read the previous data to perform the migration, the 
provided convert deserializer can be
-        *     used as a fallback resort.</li>
+        * This method is deprecated. It used to resolved compatibility of the 
serializer with serializer
+        * config snapshots in checkpoints. The responsibility for this has 
moved to
+        * {@link 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}.
         *
-        *     <li>{@link CompatibilityResult#requiresMigration()}: this 
signals Flink that migration needs to be
-        *     performed, because this serializer is not compatible, or cannot 
be reconfigured to be compatible, for
-        *     previous data. If the preceding serializer cannot be found 
(either its implementation changed or it was
-        *     removed from the classpath) then the migration will fail due to 
incapability to read previous data.</li>
-        * </ul>
+        * <p>New serializers should not override this method any more! 
Serializers implemented against Flink
+        * versions up to 1.6 should still work, but should adjust to new model 
to enable state evolution and
+        * be future-proof. See the class-level comments, section <i>"Upgrading 
TypeSerializers to the new
+        * TypeSerializerSnapshot model"</i> for details.
         *
-        * @see CompatibilityResult
-        *
-        * @param configSnapshot configuration snapshot of a preceding 
serializer for the same managed state
-        *
-        * @return the determined compatibility result (cannot be {@code null}).
+        * @deprecated Replaced by {@link 
TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)}.
         */
        @Deprecated
        public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-               throw new IllegalStateException(
-                       "Seems like that you are still using 
TypeSerializerConfigSnapshot; if so, this method must be implemented. " +
-                               "Once you change to directly use 
TypeSerializerSnapshot, then you can safely remove the implementation " +
-                               "of this method.");
-       }
-
-       @Internal
-       public final CompatibilityResult<T> 
ensureCompatibility(TypeSerializerSnapshot<?> configSnapshot) {
-               if (configSnapshot instanceof TypeSerializerConfigSnapshot) {
-                       return 
ensureCompatibility((TypeSerializerConfigSnapshot<?>) configSnapshot);
-               } else {
-                       @SuppressWarnings("unchecked")
-                       TypeSerializerSnapshot<T> casted = 
(TypeSerializerSnapshot<T>) configSnapshot;
-
-                       TypeSerializerSchemaCompatibility<T, ? extends 
TypeSerializer<T>> compat = casted.resolveSchemaCompatibility(this);
-                       if (compat.isCompatibleAsIs()) {
-                               return CompatibilityResult.compatible();
-                       } else if (compat.isCompatibleAfterMigration()) {
-                               return CompatibilityResult.requiresMigration();
-                       } else if (compat.isIncompatible()) {
-                               throw new IllegalStateException("The new 
serializer is incompatible.");
-                       } else {
-                               throw new IllegalStateException("Unidentifiable 
schema compatibility type. This is a bug, please file a JIRA.");
-                       }
-               }
+               throw new UnsupportedOperationException(
+                               "This method is not supported any more - please 
evolve your TypeSerializer the following way:\n\n" +
+                               "  - If you have a serializer whose 
'ensureCompatibility()' method delegates to another\n" +
+                               "    serializer's 'ensureCompatibility()', 
please use" +
+                                               
"'CompatibilityUtil.resolveCompatibilityResult(snapshot, this)' instead.\n\n" +
+                               "  - If you updated your serializer (removed 
overriding the 'ensureCompatibility()' method),\n" +
+                               "    please also update the corresponding 
config snapshot to not extend 'TypeSerializerConfigSnapshot'" +
+                                               "any more.\n\n");
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
index 236b994b295..9ae275a056a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -22,92 +22,39 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
- * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted within checkpoints
- * as a single source of meta information about the schema of serialized data 
in the checkpoint.
- * This serves three purposes:
- *
- * <ul>
- *   <li><strong>Capturing serializer parameters and schema:</strong> a 
serializer's configuration snapshot
- *   represents information about the parameters, state, and schema of a 
serializer.
- *   This is explained in more detail below.</li>
- *
- *   <li><strong>Compatibility checks for new serializers:</strong> when new 
serializers are available,
- *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
- *   This is performed by providing the new serializer to the corresponding 
serializer configuration
- *   snapshots in checkpoints.</li>
- *
- *   <li><strong>Factory for a read serializer when schema conversion is 
required:<strong> in the case that new
- *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
- *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
- *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
- *   In this scenario, the serializer configuration snapshots in checkpoints 
doubles as a factory for the read
- *   serializer of the conversion process.</li>
- * </ul>
- *
- * <h2>Serializer Configuration and Schema</h2>
- *
- * <p>Since serializer configuration snapshots needs to be used to ensure 
serialization compatibility
- * for the same managed state as well as serving as a factory for compatible 
read serializers, the configuration
- * snapshot should encode sufficient information about:
- *
- * <ul>
- *   <li><strong>Parameter settings of the serializer:</strong> parameters of 
the serializer include settings
- *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
- *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
- *   serializers.</li>
+ * This class bridges between the old serializer config snapshot interface 
(this class) and the new
+ * serializer config snapshot interface ({@link TypeSerializerSnapshot}).
  *
- *   <li><strong>Serialization schema of the serializer:</strong> the binary 
format used by the serializer, or
- *   in other words, the schema of data written by the serializer.</li>
- * </ul>
- *
- * <p>NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
- * deserialize the configuration snapshot from its binary form.
- *
- * @param <T> The data type that the originating serializer of this 
configuration serializes.
- *
- * @deprecated This class has been deprecated since Flink 1.7, and will 
eventually be removed.
- *             Please refer to, and directly implement a {@link 
TypeSerializerSnapshot} instead.
- *             Class-level Javadocs of {@link TypeSerializerSnapshot} provides 
more details
- *             on migrating to the new interface.
+ * <p>Serializers that create snapshots and compatibility checks with the old 
interfaces extends this class
+ * and should migrate to extend {@code TypeSerializerSnapshot} to properly 
support state evolution/migration
+ * and be future-proof.
  */
 @PublicEvolving
 @Deprecated
 public abstract class TypeSerializerConfigSnapshot<T> extends 
VersionedIOReadableWritable implements TypeSerializerSnapshot<T> {
 
+       /** Version / Magic number for the format that bridges between the old 
and new interface. */
+       private static final int ADAPTER_VERSION = 0x7a53c4f0;
+
        /** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
        private ClassLoader userCodeClassLoader;
 
-       /**
-        * The originating serializer of this configuration snapshot.
-        */
+       /** The originating serializer of this configuration snapshot. */
        private TypeSerializer<T> serializer;
 
-       /**
-        * Creates a serializer using this configuration, that is capable of 
reading data
-        * written by the serializer described by this configuration.
-        *
-        * @return the restored serializer.
-        */
-       public TypeSerializer<T> restoreSerializer() {
-               if (serializer != null) {
-                       return this.serializer;
-               } else {
-                       throw new IllegalStateException("Trying to restore the 
prior serializer via TypeSerializerConfigSnapshot, " +
-                               "but the prior serializer has not been set.");
-               }
-       }
-
        /**
         * Set the originating serializer of this configuration snapshot.
         */
        @Internal
-       public void setPriorSerializer(TypeSerializer<T> serializer) {
+       public final void setPriorSerializer(TypeSerializer<T> serializer) {
                this.serializer = Preconditions.checkNotNull(serializer);
        }
 
@@ -135,26 +82,77 @@ public final ClassLoader getUserCodeClassLoader() {
                return userCodeClassLoader;
        }
 
-       public abstract boolean equals(Object obj);
-
-       public abstract int hashCode();
-
        // 
----------------------------------------------------------------------------
-       //  Irrelevant methods; these methods should only ever be used when the 
new interface is directly implemented.
+       //  Implementation of the TypeSerializerSnapshot interface
        // 
----------------------------------------------------------------------------
 
        @Override
-       public int getCurrentVersion() {
-               throw new UnsupportedOperationException();
+       public final int getCurrentVersion() {
+               return ADAPTER_VERSION;
+       }
+
+       @Override
+       public final void writeSnapshot(DataOutputView out) throws IOException {
+               checkState(serializer != null, "the prior serializer has not 
been set on this");
+
+               // write the snapshot for a non-updated serializer.
+               // this mimics the previous behavior where the TypeSerializer 
was
+               // Java-serialized, for backwards compatibility
+               TypeSerializerSerializationUtil.writeSerializer(out, 
serializer);
+
+               // now delegate to the snapshots own writing code
+               write(out);
+       }
+
+       @Override
+       public final void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
+               if (readVersion != ADAPTER_VERSION) {
+                       throw new IOException("Wrong/unexpected version for the 
TypeSerializerConfigSnapshot: " + readVersion);
+               }
+
+               serializer = 
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, 
true);
+
+               // now delegate to the snapshots own reading code
+               setUserCodeClassLoader(userCodeClassLoader);
+               read(in);
        }
 
+       /**
+        * Creates a serializer using this configuration, that is capable of 
reading data
+        * written by the serializer described by this configuration.
+        *
+        * @return the restored serializer.
+        */
        @Override
-       public final void read(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
-               throw new UnsupportedOperationException();
+       public final TypeSerializer<T> restoreSerializer() {
+               if (serializer == null) {
+                       throw new IllegalStateException(
+                                       "Trying to restore the prior serializer 
via TypeSerializerConfigSnapshot, " +
+                                       "but the prior serializer has not been 
set.");
+               }
+               else if (serializer instanceof UnloadableDummyTypeSerializer) {
+                       Throwable originalError = 
((UnloadableDummyTypeSerializer<?>) serializer).getOriginalError();
+
+                       throw new IllegalStateException(
+                                       "Could not Java-deserialize 
TypeSerializer while restoring checkpoint metadata for serializer " +
+                                       "snapshot '" + getClass().getName() + 
"'. " +
+                                       "Please update to the 
TypeSerializerSnapshot interface that removes Java Serialization to avoid " +
+                                       "this problem in the future.", 
originalError);
+               } else {
+                       return this.serializer;
+               }
        }
 
        @Override
-       public final <NS extends TypeSerializer<T>> 
TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS 
newSerializer) {
-               throw new UnsupportedOperationException();
+       public final <NS extends TypeSerializer<T>> 
TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(
+                       NS newSerializer) {
+
+               // in prior versions, the compatibility check was in the 
serializer itself, so we
+               // delegate this call to the serializer.
+               final CompatibilityResult<T> compatibility = 
newSerializer.ensureCompatibility(this);
+
+               return compatibility.isRequiresMigration() ?
+                               
TypeSerializerSchemaCompatibility.incompatible() :
+                               
TypeSerializerSchemaCompatibility.compatibleAsIs();
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index e7cdc382f60..7a4ee5bd606 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -118,7 +118,7 @@
                } catch (UnloadableTypeSerializerException e) {
                        if (useDummyPlaceholder) {
                                LOG.warn("Could not read a requested 
serializer. Replaced with a UnloadableDummyTypeSerializer.", e.getCause());
-                               return new 
UnloadableDummyTypeSerializer<>(e.getSerializerBytes());
+                               return new 
UnloadableDummyTypeSerializer<>(e.getSerializerBytes(), e.getCause());
                        } else {
                                throw e;
                        }
@@ -232,8 +232,6 @@ public static void writeSerializersAndConfigsWithResilience(
         */
        public static final class TypeSerializerSerializationProxy<T> extends 
VersionedIOReadableWritable {
 
-               private static final Logger LOG = 
LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
-
                private static final int VERSION = 1;
 
                private ClassLoader userClassLoader;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
index 067d43d480d..4715d1119bf 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
@@ -25,7 +25,7 @@
 import java.io.IOException;
 
 /**
- * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
+ * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link 
TypeSerializer}'s configuration.
  * The configuration snapshot of a serializer is persisted within checkpoints
  * as a single source of meta information about the schema of serialized data 
in the checkpoint.
  * This serves three purposes:
@@ -40,7 +40,7 @@
  *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
  *   snapshots in checkpoints.</li>
  *
- *   <li><strong>Factory for a read serializer when schema conversion is 
required:<strong> in the case that new
+ *   <li><strong>Factory for a read serializer when schema conversion is 
required:</strong> in the case that new
  *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
  *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
  *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
@@ -86,9 +86,9 @@
         *
         * @param out the {@link DataOutputView} to write the snapshot to.
         *
-        * @throws IOException
+        * @throws IOException Thrown if the snapshot data could not be written.
         */
-       void write(DataOutputView out) throws IOException;
+       void writeSnapshot(DataOutputView out) throws IOException;
 
        /**
         * Reads the serializer snapshot from the provided {@link 
DataInputView}.
@@ -100,9 +100,9 @@
         * @param in the {@link DataInputView} to read the snapshot from.
         * @param userCodeClassLoader the user code classloader
         *
-        * @throws IOException
+        * * @throws IOException Thrown if the snapshot data could be read or 
parsed.
         */
-       void read(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException;
+       void readSnapshot(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException;
 
        /**
         * Recreates a serializer instance from this snapshot. The returned
@@ -114,8 +114,16 @@
        TypeSerializer<T> restoreSerializer();
 
        /**
-        * Checks a new serializer's compatibility to read data written by the 
prior
-        * serializer.
+        * Checks a new serializer's compatibility to read data written by the 
prior serializer.
+        *
+        * <p>When a checkpoint/savepoint is restored, this method checks 
whether the serialization
+        * format of the data in the checkpoint/savepoint is compatible for the 
format of the serializer used by the
+        * program that restores the checkpoint/savepoint. The outcome can be 
that the serialization format is
+        * compatible, that the program's serializer needs to reconfigure 
itself (meaning to incorporate some
+        * information from the TypeSerializerSnapshot to be compatible), that 
the format is outright incompatible,
+        * or that a migration needed. In the latter case, the 
TypeSerializerSnapshot produces a serializer to
+        * deserialize the data, and the restoring program's serializer 
re-serializes the data, thus converting
+        * the format during the restore operation.
         *
         * @param newSerializer the new serializer to check.
         * @param <NS> the type of the new serializer
@@ -124,4 +132,35 @@
         */
        <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> 
resolveSchemaCompatibility(NS newSerializer);
 
+       // 
------------------------------------------------------------------------
+       //  read / write utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Writes the given snapshot to the out stream. One should always use 
this method to write
+        * snapshots out, rather than directly calling {@link 
#writeSnapshot(DataOutputView)}.
+        *
+        * <p>The snapshot written with this method can be read via {@link 
#readVersionedSnapshot(DataInputView, ClassLoader)}.
+        */
+       static void writeVersionedSnapshot(DataOutputView out, 
TypeSerializerSnapshot<?> snapshot) throws IOException {
+               out.writeUTF(snapshot.getClass().getName());
+               out.writeInt(snapshot.getCurrentVersion());
+               snapshot.writeSnapshot(out);
+       }
+
+
+       /**
+        * Reads a snapshot from the stream, performing resolving
+        *
+        * <p>This method reads snapshots written by {@link 
#writeVersionedSnapshot(DataOutputView, TypeSerializerSnapshot)}.
+        */
+       static <T> TypeSerializerSnapshot<T> 
readVersionedSnapshot(DataInputView in, ClassLoader cl) throws IOException {
+               final TypeSerializerSnapshot<T> snapshot =
+                               
TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(in, cl);
+
+               final int version = in.readInt();
+               snapshot.readSnapshot(version, in, cl);
+
+               return snapshot;
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
index 0bcff93b802..8e12e301cbb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotSerializationUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.typeutils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -25,8 +26,11 @@
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Utility methods for serialization of {@link TypeSerializerSnapshot}.
  */
@@ -42,8 +46,6 @@
         * @param serializerSnapshot the serializer configuration snapshot to 
write
         * @param serializer the prior serializer. This needs to be written of 
the serializer snapshot
         *                   if the serializer snapshot is still the legacy 
{@link TypeSerializerConfigSnapshot}.
-        *
-        * @throws IOException
         */
        public static <T> void writeSerializerSnapshot(
                DataOutputView out,
@@ -63,8 +65,6 @@
         *                                restoring from a snapshot taken with 
Flink version <= 1.6.
         *
         * @return the read serializer configuration snapshot
-        *
-        * @throws IOException
         */
        public static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(
                        DataInputView in,
@@ -78,6 +78,30 @@
                return proxy.getSerializerSnapshot();
        }
 
+
+       public static <T> TypeSerializerSnapshot<T> 
readAndInstantiateSnapshotClass(DataInputView in, ClassLoader cl) throws 
IOException {
+               final String className = in.readUTF();
+
+               final Class<? extends TypeSerializerSnapshot> rawClazz;
+               try {
+                       rawClazz = Class
+                                       .forName(className, false, cl)
+                                       
.asSubclass(TypeSerializerSnapshot.class);
+               }
+               catch (ClassNotFoundException e) {
+                       throw new IOException(
+                                       "Could not find requested 
TypeSerializerSnapshot class '" + className +  "' in classpath.", e);
+               }
+               catch (ClassCastException e) {
+                       throw new IOException("The class '" + className + "' is 
not a subclass of TypeSerializerSnapshot.", e);
+               }
+
+               @SuppressWarnings("unchecked")
+               final Class<? extends TypeSerializerSnapshot<T>> clazz = 
(Class<? extends TypeSerializerSnapshot<T>>) rawClazz;
+
+               return InstantiationUtil.instantiate(clazz);
+       }
+
        /**
         * Utility serialization proxy for a {@link TypeSerializerSnapshot}.
         */
@@ -87,8 +111,11 @@
 
                private ClassLoader userCodeClassLoader;
                private TypeSerializerSnapshot<T> serializerSnapshot;
-               private TypeSerializer<T> serializer;
+               @Nullable private TypeSerializer<T> serializer;
 
+               /**
+                * Constructor for reading serializers.
+                */
                TypeSerializerSnapshotSerializationProxy(
                        ClassLoader userCodeClassLoader,
                        @Nullable TypeSerializer<T> existingPriorSerializer) {
@@ -96,6 +123,9 @@
                        this.serializer = existingPriorSerializer;
                }
 
+               /**
+                * Constructor for writing out serializers.
+                */
                TypeSerializerSnapshotSerializationProxy(
                        TypeSerializerSnapshot<T> serializerConfigSnapshot,
                        TypeSerializer<T> serializer) {
@@ -107,91 +137,39 @@
                 * Binary format layout of a written serializer snapshot is as 
follows:
                 *
                 * <ul>
-                *     <li>1. Serializer snapshot classname (UTF).</li>
-                *     <li>2. The originating serializer of the snapshot, if 
any, written via Java serialization.
-                *         Presence of the serializer is indicated by a flag 
(boolean -> TypeSerializer).</li>
-                *     <li>3. The version of the serializer snapshot's binary 
format.</li>
-                *     <li>4. The actual serializer snapshot.</li>
+                *     <li>1. Format version of this util.</li>
+                *     <li>2. Name of the TypeSerializerSnapshot class.</li>
+                *     <li>3. The version of the TypeSerializerSnapshot's 
binary format.</li>
+                *     <li>4. The actual serializer snapshot data.</li>
                 * </ul>
                 */
+               @SuppressWarnings("deprecation")
                @Override
                public void write(DataOutputView out) throws IOException {
-                       super.write(out);
+                       setSerializerForWriteIfOldPath(serializerSnapshot, 
serializer);
 
-                       // config snapshot class, so that we can re-instantiate 
the
-                       // correct type of config snapshot instance when 
deserializing
-                       out.writeUTF(serializerSnapshot.getClass().getName());
+                       // write the format version of this utils format
+                       super.write(out);
 
-                       if (serializerSnapshot instanceof 
TypeSerializerConfigSnapshot) {
-                               // backwards compatible path, where the 
serializer snapshot is still using the
-                               // deprecated interface; the originating 
serializer needs to be written to the byte stream
-                               out.writeBoolean(true);
-                               @SuppressWarnings("unchecked")
-                               TypeSerializerConfigSnapshot<T> 
legacySerializerSnapshot = (TypeSerializerConfigSnapshot<T>) serializerSnapshot;
-                               
TypeSerializerSerializationUtil.writeSerializer(out, serializer);
-
-                               // TypeSerializerConfigSnapshot includes the 
version number implicitly when it is written
-                               legacySerializerSnapshot.write(out);
-                       } else {
-                               out.writeBoolean(false);
-
-                               
out.writeInt(serializerSnapshot.getCurrentVersion());
-                               serializerSnapshot.write(out);
-                       }
+                       TypeSerializerSnapshot.writeVersionedSnapshot(out, 
serializerSnapshot);
                }
 
                @SuppressWarnings("unchecked")
                @Override
                public void read(DataInputView in) throws IOException {
+                       // read version
                        super.read(in);
-
-                       String serializerConfigClassname = in.readUTF();
-                       Class<? extends TypeSerializerSnapshot> 
serializerConfigSnapshotClass;
-                       try {
-                               serializerConfigSnapshotClass = (Class<? 
extends TypeSerializerSnapshot>)
-                                       
Class.forName(serializerConfigClassname, false, userCodeClassLoader);
-                       } catch (ClassNotFoundException e) {
-                               throw new IOException(
-                                       "Could not find requested 
TypeSerializerConfigSnapshot class "
-                                               + serializerConfigClassname +  
" in classpath.", e);
-                       }
-
-                       serializerSnapshot = 
InstantiationUtil.instantiate(serializerConfigSnapshotClass);
-
-                       if (getReadVersion() >= 2) {
-                               // Flink version after 1.7
-
-                               boolean containsPriorSerializer = 
in.readBoolean();
-
-                               TypeSerializer<T> priorSerializer = 
(containsPriorSerializer)
-                                       ? 
TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader, true)
-                                       : null;
-
-                               if (serializerSnapshot instanceof 
TypeSerializerConfigSnapshot) {
-                                       if (priorSerializer != null) {
-                                               
((TypeSerializerConfigSnapshot<T>) 
serializerSnapshot).setPriorSerializer(priorSerializer);
-                                               
((TypeSerializerConfigSnapshot<T>) 
serializerSnapshot).setUserCodeClassLoader(userCodeClassLoader);
-                                               
((TypeSerializerConfigSnapshot<T>) serializerSnapshot).read(in);
-                                       } else {
-                                               // this occurs if the user 
changed a TypeSerializerSnapshot to the
-                                               // legacy 
TypeSerializerConfigSnapshot, which isn't supported.
-                                               throw new IOException("Cannot 
read a legacy TypeSerializerConfigSnapshot without the prior serializer 
present. ");
-                                       }
-                               } else {
-                                       int readVersion = in.readInt();
-                                       serializerSnapshot.read(readVersion, 
in, userCodeClassLoader);
-                               }
-                       } else {
-                               // Flink version before 1.7.x, and after 1.3.x
-
-                               if (serializerSnapshot instanceof 
TypeSerializerConfigSnapshot) {
-                                       ((TypeSerializerConfigSnapshot<T>) 
serializerSnapshot).setPriorSerializer(this.serializer);
-                                       ((TypeSerializerConfigSnapshot<T>) 
serializerSnapshot).setUserCodeClassLoader(userCodeClassLoader);
-                                       ((TypeSerializerConfigSnapshot<T>) 
serializerSnapshot).read(in);
-                               } else {
-                                       int readVersion = in.readInt();
-                                       serializerSnapshot.read(readVersion, 
in, userCodeClassLoader);
-                               }
+                       final int version = getReadVersion();
+
+                       switch (version) {
+                               case 2:
+                                       serializerSnapshot = deserializeV2(in, 
userCodeClassLoader);
+                                       break;
+                               case 1:
+                                       serializerSnapshot = deserializeV1(in, 
userCodeClassLoader, serializer);
+                                       break;
+                               default:
+                                       throw new IOException("Unrecognized 
version for TypeSerializerSnapshot format: " + version);
                        }
                }
 
@@ -208,5 +186,57 @@ public int getVersion() {
                TypeSerializerSnapshot<T> getSerializerSnapshot() {
                        return serializerSnapshot;
                }
+
+               /**
+                * Deserialization path for Flink versions 1.7+.
+                */
+               @VisibleForTesting
+               static <T> TypeSerializerSnapshot<T> 
deserializeV2(DataInputView in, ClassLoader cl) throws IOException {
+                       return TypeSerializerSnapshot.readVersionedSnapshot(in, 
cl);
+               }
+
+               /**
+                * Deserialization path for Flink versions in [1.4, 1.6].
+                */
+               @VisibleForTesting
+               @SuppressWarnings("deprecation")
+               static <T> TypeSerializerSnapshot<T> deserializeV1(
+                               DataInputView in,
+                               ClassLoader cl,
+                               @Nullable TypeSerializer<T> serializer) throws 
IOException {
+
+                       TypeSerializerSnapshot<T> snapshot = 
readAndInstantiateSnapshotClass(in, cl);
+
+                       // if the snapshot was created before Flink 1.7, we 
need to distinguish the following cases:
+                       //   - old snapshot type that needs serializer from the 
outside
+                       //   - new snapshot type that understands the old 
format and can produce a restore serializer from it
+                       if (snapshot instanceof TypeSerializerConfigSnapshot) {
+                               TypeSerializerConfigSnapshot<T> oldTypeSnapshot 
= (TypeSerializerConfigSnapshot<T>) snapshot;
+                               oldTypeSnapshot.setPriorSerializer(serializer);
+                               oldTypeSnapshot.setUserCodeClassLoader(cl);
+                               oldTypeSnapshot.read(in);
+                       }
+                       else {
+                               // new type, simple case
+                               int readVersion = in.readInt();
+                               snapshot.readSnapshot(readVersion, in, cl);
+                       }
+
+                       return snapshot;
+               }
+
+               @SuppressWarnings("deprecation")
+               private static <T> void setSerializerForWriteIfOldPath(
+                               TypeSerializerSnapshot<T> serializerSnapshot,
+                               TypeSerializer<T> serializer) {
+
+                       // for compatibility with non-upgraded serializers, put 
the serializer into the
+                       // config snapshot if it of the old version
+                       if (serializerSnapshot instanceof 
TypeSerializerConfigSnapshot) {
+                               checkState(serializer != null);
+
+                               ((TypeSerializerConfigSnapshot<T>) 
serializerSnapshot).setPriorSerializer(serializer);
+                       }
+               }
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
index 448c53b209e..fe55be79e12 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/UnloadableDummyTypeSerializer.java
@@ -22,6 +22,7 @@
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InvalidClassException;
 import java.util.Arrays;
@@ -33,16 +34,30 @@
 public class UnloadableDummyTypeSerializer<T> extends TypeSerializer<T> {
 
        private static final long serialVersionUID = 2526330533671642711L;
+
        private final byte[] actualBytes;
 
+       @Nullable
+       private final Throwable originalError;
+
        public UnloadableDummyTypeSerializer(byte[] actualBytes) {
+               this(actualBytes, null);
+       }
+
+       public UnloadableDummyTypeSerializer(byte[] actualBytes, @Nullable 
Throwable originalError) {
                this.actualBytes = Preconditions.checkNotNull(actualBytes);
+               this.originalError = originalError;
        }
 
        public byte[] getActualBytes() {
                return actualBytes;
        }
 
+       @Nullable
+       public Throwable getOriginalError() {
+               return originalError;
+       }
+
        @Override
        public boolean isImmutableType() {
                throw new UnsupportedOperationException("This object is a dummy 
TypeSerializer.");
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index a79ac1de6fc..9c6bcb50bd5 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -20,7 +20,6 @@
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -121,12 +120,8 @@ public void testSnapshotConfigurationAndReconfigure() 
throws Exception {
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader(), getSerializer());
                }
 
-               CompatibilityResult strategy = 
getSerializer().ensureCompatibility(restoredConfig);
-               assertFalse(strategy.isRequiresMigration());
-
-               // also verify that the serializer's reconfigure implementation 
detects incompatibility
-               strategy = getSerializer().ensureCompatibility(new 
TestIncompatibleSerializerConfigSnapshot<>());
-               assertTrue(strategy.isRequiresMigration());
+               TypeSerializerSchemaCompatibility<T, ? extends 
TypeSerializer<T>> strategy = 
restoredConfig.resolveSchemaCompatibility(getSerializer());
+               assertTrue(strategy.isCompatibleAsIs());
        }
 
        @Test
@@ -543,23 +538,6 @@ public void skipBytesToRead(int numBytes) throws 
IOException {
                }
        }
 
-       public static final class TestIncompatibleSerializerConfigSnapshot<T> 
extends TypeSerializerConfigSnapshot<T> {
-               @Override
-               public int getVersion() {
-                       return 0;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       return obj instanceof 
TestIncompatibleSerializerConfigSnapshot;
-               }
-
-               @Override
-               public int hashCode() {
-                       return getClass().hashCode();
-               }
-       }
-
        private static <T> void checkToString(T value) {
                if (value != null) {
                        value.toString();
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
index 8f82ea88ed9..0fae2699807 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -106,8 +107,8 @@ public void testConfigurationSnapshotSerialization() throws 
Exception {
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader(), serializer);
                }
 
-               CompatibilityResult<PublicEnum> compatResult = 
serializer.ensureCompatibility(restoredConfig);
-               assertFalse(compatResult.isRequiresMigration());
+               TypeSerializerSchemaCompatibility<PublicEnum, ?> compatResult = 
restoredConfig.resolveSchemaCompatibility(serializer);
+               assertTrue(compatResult.isCompatibleAsIs());
 
                assertEquals(PublicEnum.FOO.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
                assertEquals(PublicEnum.BAR.ordinal(), 
serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
index b4f983382f5..e906f62f65b 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerUpgradeTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -56,7 +56,7 @@
         */
        @Test
        public void checkIndenticalEnums() throws Exception {
-               Assert.assertFalse(checkCompatibility(ENUM_A, 
ENUM_A).isRequiresMigration());
+               Assert.assertTrue(checkCompatibility(ENUM_A, 
ENUM_A).isCompatibleAsIs());
        }
 
        /**
@@ -64,7 +64,7 @@ public void checkIndenticalEnums() throws Exception {
         */
        @Test
        public void checkAppendedField() throws Exception {
-               Assert.assertFalse(checkCompatibility(ENUM_A, 
ENUM_B).isRequiresMigration());
+               Assert.assertTrue(checkCompatibility(ENUM_A, 
ENUM_B).isCompatibleAsIs());
        }
 
        /**
@@ -72,7 +72,7 @@ public void checkAppendedField() throws Exception {
         */
        @Test
        public void checkRemovedField() throws Exception {
-               Assert.assertTrue(checkCompatibility(ENUM_A, 
ENUM_C).isRequiresMigration());
+               Assert.assertTrue(checkCompatibility(ENUM_A, 
ENUM_C).isIncompatible());
        }
 
        /**
@@ -80,11 +80,11 @@ public void checkRemovedField() throws Exception {
         */
        @Test
        public void checkDifferentFieldOrder() throws Exception {
-               Assert.assertFalse(checkCompatibility(ENUM_A, 
ENUM_D).isRequiresMigration());
+               Assert.assertTrue(checkCompatibility(ENUM_A, 
ENUM_D).isCompatibleAsIs());
        }
 
        @SuppressWarnings("unchecked")
-       private static CompatibilityResult checkCompatibility(String 
enumSourceA, String enumSourceB)
+       private static TypeSerializerSchemaCompatibility 
checkCompatibility(String enumSourceA, String enumSourceB)
                throws IOException, ClassNotFoundException {
 
                ClassLoader classLoader = compileAndLoadEnum(
@@ -116,7 +116,7 @@ private static CompatibilityResult 
checkCompatibility(String enumSourceA, String
                }
 
                EnumSerializer enumSerializer2 = new 
EnumSerializer(classLoader2.loadClass(ENUM_NAME));
-               return enumSerializer2.ensureCompatibility(restoredSnapshot);
+               return 
restoredSnapshot.resolveSchemaCompatibility(enumSerializer2);
        }
 
        private static ClassLoader compileAndLoadEnum(File root, String 
filename, String source) throws IOException {
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 9620846a4a6..4f54c3ee4d9 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -34,13 +34,13 @@
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
@@ -63,7 +63,6 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -311,8 +310,10 @@ public void testReconfigureWithDifferentPojoType() throws 
Exception {
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader(), pojoSerializer2);
                }
 
-               CompatibilityResult<SubTestUserClassA> compatResult = 
pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
-               assertTrue(compatResult.isRequiresMigration());
+               @SuppressWarnings("unchecked")
+               TypeSerializerSchemaCompatibility<SubTestUserClassA, ?> 
compatResult =
+                       
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer2);
+               assertTrue(compatResult.isIncompatible());
        }
 
        /**
@@ -352,8 +353,10 @@ public void 
testReconfigureDifferentSubclassRegistrationOrder() throws Exception
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader(), pojoSerializer);
                }
 
-               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-               assertTrue(!compatResult.isRequiresMigration());
+               @SuppressWarnings("unchecked")
+               TypeSerializerSchemaCompatibility<TestUserClass, ?> 
compatResult =
+                       
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+               assertTrue(compatResult.isCompatibleAsIs());
 
                // reconfigure - check reconfiguration result and that 
registration ids remains the same
                //assertEquals(ReconfigureResult.COMPATIBLE, 
pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
@@ -397,8 +400,10 @@ public void 
testReconfigureRepopulateNonregisteredSubclassSerializerCache() thro
                }
 
                // reconfigure - check reconfiguration result and that subclass 
serializer cache is repopulated
-               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-               assertFalse(compatResult.isRequiresMigration());
+               @SuppressWarnings("unchecked")
+               TypeSerializerSchemaCompatibility<TestUserClass, ?> 
compatResult =
+                       
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+               assertTrue(compatResult.isCompatibleAsIs());
                assertEquals(2, 
pojoSerializer.getSubclassSerializerCache().size());
                
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
                
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -460,8 +465,10 @@ public void 
testReconfigureWithPreviouslyNonregisteredSubclasses() throws Except
                // reconfigure - check reconfiguration result and that
                // 1) subclass serializer cache is repopulated
                // 2) registrations also contain the now registered subclasses
-               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(pojoSerializerConfigSnapshot);
-               assertFalse(compatResult.isRequiresMigration());
+               @SuppressWarnings("unchecked")
+               TypeSerializerSchemaCompatibility<TestUserClass, ?> 
compatResult =
+                       
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+               assertTrue(compatResult.isCompatibleAsIs());
                assertEquals(2, 
pojoSerializer.getSubclassSerializerCache().size());
                
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
                
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
@@ -537,8 +544,9 @@ public void testReconfigureWithDifferentFieldOrder() throws 
Exception {
                                new HashMap<>()); // empty; irrelevant for this 
test
 
                // reconfigure - check reconfiguration result and that fields 
are reordered to the previous order
-               CompatibilityResult<TestUserClass> compatResult = 
pojoSerializer.ensureCompatibility(mockPreviousConfigSnapshot);
-               assertFalse(compatResult.isRequiresMigration());
+               TypeSerializerSchemaCompatibility<TestUserClass, ?> 
compatResult =
+                       
mockPreviousConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
+               assertTrue(compatResult.isCompatibleAsIs());
                int i = 0;
                for (Field field : mockOriginalFieldOrder) {
                        assertEquals(field, pojoSerializer.getFields()[i]);
@@ -580,7 +588,7 @@ public void testSerializerSerializationFailureResilience() 
throws Exception{
                                        pojoSerializer);
                }
 
-               
Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration());
+               
Assert.assertTrue(deserializedConfig.resolveSchemaCompatibility(pojoSerializer).isCompatibleAsIs());
                
verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(config, 
deserializedConfig);
        }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 3c9d990fb31..869941e9174 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.api.java.typeutils.runtime.kryo;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -44,7 +44,6 @@
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -67,8 +66,11 @@ public void testMigrationStrategyForRemovedAvroDependency() 
throws Exception {
                        kryoSerializerConfigSnapshot = 
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader(), kryoSerializerForA);
                }
-               CompatibilityResult<TestClass> compatResult = 
kryoSerializerForA.ensureCompatibility(kryoSerializerConfigSnapshot);
-               assertFalse(compatResult.isRequiresMigration());
+
+               @SuppressWarnings("unchecked")
+               TypeSerializerSchemaCompatibility<TestClass, ?> compatResult =
+                       
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForA);
+               assertTrue(compatResult.isCompatibleAsIs());
        }
 
        @Test
@@ -111,8 +113,10 @@ public void testMigrationStrategyWithDifferentKryoType() 
throws Exception {
                                new DataInputViewStreamWrapper(in), 
Thread.currentThread().getContextClassLoader(), kryoSerializerForB);
                }
 
-               CompatibilityResult<TestClassB> compatResult = 
kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
-               assertTrue(compatResult.isRequiresMigration());
+               @SuppressWarnings("unchecked")
+               TypeSerializerSchemaCompatibility<TestClassB, ?> compatResult =
+                       
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForB);
+               assertTrue(compatResult.isIncompatible());
        }
 
        @Test
@@ -272,8 +276,10 @@ public void 
testMigrationStrategyForDifferentRegistrationOrder() throws Exceptio
                }
 
                // reconfigure - check reconfiguration result and that 
registration id remains the same
-               CompatibilityResult<TestClass> compatResult = 
kryoSerializer.ensureCompatibility(kryoSerializerConfigSnapshot);
-               assertFalse(compatResult.isRequiresMigration());
+               @SuppressWarnings("unchecked")
+               TypeSerializerSchemaCompatibility<TestClass, ?> compatResult =
+                       
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializer);
+               assertTrue(compatResult.isCompatibleAsIs());
                assertEquals(testClassId, 
kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
                assertEquals(testClassAId, 
kryoSerializer.getKryo().getRegistration(TestClassA.class).getId());
                assertEquals(testClassBId, 
kryoSerializer.getKryo().getRegistration(TestClassB.class).getId());
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
new file mode 100644
index 00000000000..1dd56a775aa
--- /dev/null
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/LegacyAvroExternalJarProgramITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.avro.testjar.AvroExternalJarProgram;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+
+/**
+ * IT case for the {@link AvroExternalJarProgram}.
+ */
+public class LegacyAvroExternalJarProgramITCase extends TestLogger {
+
+       private static final String JAR_FILE = "maven-test-jar.jar";
+
+       private static final String TEST_DATA_FILE = "/testdata.avro";
+
+       @Test
+       public void testExternalProgram() {
+
+               LocalFlinkMiniCluster testMiniCluster = null;
+
+               try {
+                       int parallelism = 4;
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+                       testMiniCluster = new LocalFlinkMiniCluster(config, 
false);
+                       testMiniCluster.start();
+
+                       String jarFile = JAR_FILE;
+                       String testData = 
getClass().getResource(TEST_DATA_FILE).toString();
+
+                       PackagedProgram program = new PackagedProgram(new 
File(jarFile), new String[] { testData });
+
+                       TestEnvironment.setAsContext(
+                               testMiniCluster,
+                               parallelism,
+                               Collections.singleton(new Path(jarFile)),
+                               Collections.<URL>emptyList());
+
+                       config.setString(JobManagerOptions.ADDRESS, 
"localhost");
+                       config.setInteger(JobManagerOptions.PORT, 
testMiniCluster.getLeaderRPCPort());
+
+                       program.invokeInteractiveModeForExecution();
+               }
+               catch (Throwable t) {
+                       System.err.println(t.getMessage());
+                       t.printStackTrace();
+                       Assert.fail("Error during the packaged program 
execution: " + t.getMessage());
+               }
+               finally {
+                       TestEnvironment.unsetAsContext();
+
+                       if (testMiniCluster != null) {
+                               try {
+                                       testMiniCluster.stop();
+                               } catch (Throwable t) {
+                                       // ignore
+                               }
+                       }
+               }
+       }
+}
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index 7b8763bfa2f..463f8f6f26d 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -36,7 +36,6 @@
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -105,10 +104,10 @@ public void testCompatibilityWithPojoSerializer() throws 
Exception {
                validateDeserialization(serializer);
 
                // sanity check for the test: check that a PoJoSerializer and 
the original serializer work together
-               
assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+               
assertTrue(configSnapshot.resolveSchemaCompatibility(serializer).isCompatibleAsIs());
 
                final TypeSerializer<SimpleUser> newSerializer = new 
AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-               
assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
+               
assertTrue(configSnapshot.resolveSchemaCompatibility(newSerializer).isCompatibleAsIs());
 
                // deserialize the data and make sure this still works
                validateDeserialization(newSerializer);
@@ -116,7 +115,7 @@ public void testCompatibilityWithPojoSerializer() throws 
Exception {
                TypeSerializerSnapshot<SimpleUser> nextSnapshot = 
newSerializer.snapshotConfiguration();
                final TypeSerializer<SimpleUser> nextSerializer = new 
AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
 
-               
assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
+               
assertTrue(nextSnapshot.resolveSchemaCompatibility(nextSerializer).isCompatibleAsIs());
 
                // deserialize the data and make sure this still works
                validateDeserialization(nextSerializer);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eae5a3bccdd..ae4fbaa133b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,9 +24,9 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -228,42 +228,32 @@ public void dispose() {
 
                        final StateMetaInfoSnapshot metaInfoSnapshot = 
restoredBroadcastStateMetaInfos.get(name);
 
-                       @SuppressWarnings("unchecked")
-                       RegisteredBroadcastStateBackendMetaInfo<K, V> 
restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K, 
V>(metaInfoSnapshot);
+                       // check whether new serializers are incompatible
+                       TypeSerializerSnapshot<K> keySerializerSnapshot = 
Preconditions.checkNotNull(
+                               (TypeSerializerSnapshot<K>) 
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
 
-                       // check compatibility to determine if state migration 
is required
-                       CompatibilityResult<K> keyCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       restoredMetaInfo.getKeySerializer(),
-                                       UnloadableDummyTypeSerializer.class,
-                                       //TODO this keys should not be exposed 
and should be adapted after FLINK-9377 was merged
-                                       
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-                                       broadcastStateKeySerializer);
-
-                       CompatibilityResult<V> valueCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       restoredMetaInfo.getValueSerializer(),
-                                       UnloadableDummyTypeSerializer.class,
-                                       //TODO this keys should not be exposed 
and should be adapted after FLINK-9377 was merged
-                                       
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-                                       broadcastStateValueSerializer);
-
-                       if (!keyCompatibility.isRequiresMigration() && 
!valueCompatibility.isRequiresMigration()) {
-                               // new serializer is compatible; use it to 
replace the old serializer
-                               broadcastState.setStateMetaInfo(
-                                               new 
RegisteredBroadcastStateBackendMetaInfo<>(
-                                                               name,
-                                                               
OperatorStateHandle.Mode.BROADCAST,
-                                                               
broadcastStateKeySerializer,
-                                                               
broadcastStateValueSerializer));
-                       } else {
-                               // TODO state migration currently isn't 
possible.
-
-                               // NOTE: for heap backends, it is actually fine 
to proceed here without failing the restore,
-                               // since the state has already been 
deserialized to objects and we can just continue with
-                               // the new serializer; we're deliberately 
failing here for now to have equal functionality with
-                               // the RocksDB backend to avoid confusion for 
users.
-
-                               throw StateMigrationException.notSupported();
+                       TypeSerializerSchemaCompatibility<K, ?> 
keyCompatibility =
+                               
keySerializerSnapshot.resolveSchemaCompatibility(broadcastStateKeySerializer);
+                       if (keyCompatibility.isIncompatible()) {
+                               throw new StateMigrationException("The new key 
serializer for broadcast state must not be incompatible.");
+                       }
+
+                       TypeSerializerSnapshot<V> valueSerializerSnapshot = 
Preconditions.checkNotNull(
+                               (TypeSerializerSnapshot<V>) 
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+                       TypeSerializerSchemaCompatibility<V, ?> 
valueCompatibility =
+                               
valueSerializerSnapshot.resolveSchemaCompatibility(broadcastStateValueSerializer);
+                       if (valueCompatibility.isIncompatible()) {
+                               throw new StateMigrationException("The new 
value serializer for broadcast state must not be incompatible.");
                        }
+
+                       // new serializer is compatible; use it to replace the 
old serializer
+                       broadcastState.setStateMetaInfo(
+                                       new 
RegisteredBroadcastStateBackendMetaInfo<>(
+                                                       name,
+                                                       
OperatorStateHandle.Mode.BROADCAST,
+                                                       
broadcastStateKeySerializer,
+                                                       
broadcastStateValueSerializer));
                }
 
                accessedBroadcastStatesByName.put(name, broadcastState);
@@ -606,27 +596,19 @@ public void addAll(List<S> values) {
 
                        // check compatibility to determine if state migration 
is required
                        TypeSerializer<S> newPartitionStateSerializer = 
partitionStateSerializer.duplicate();
-                       CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       metaInfo.getPartitionStateSerializer(),
-                                       UnloadableDummyTypeSerializer.class,
-                                       //TODO this keys should not be exposed 
and should be adapted after FLINK-9377 was merged
-                                       
restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-                                       newPartitionStateSerializer);
-
-                       if (!stateCompatibility.isRequiresMigration()) {
-                               // new serializer is compatible; use it to 
replace the old serializer
-                               partitionableListState.setStateMetaInfo(
-                                       new 
RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, 
mode));
-                       } else {
-                               // TODO state migration currently isn't 
possible.
-
-                               // NOTE: for heap backends, it is actually fine 
to proceed here without failing the restore,
-                               // since the state has already been 
deserialized to objects and we can just continue with
-                               // the new serializer; we're deliberately 
failing here for now to have equal functionality with
-                               // the RocksDB backend to avoid confusion for 
users.
-
-                               throw StateMigrationException.notSupported();
+
+                       @SuppressWarnings("unchecked")
+                       TypeSerializerSnapshot<S> stateSerializerSnapshot = 
Preconditions.checkNotNull(
+                               (TypeSerializerSnapshot<S>) 
restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+                       TypeSerializerSchemaCompatibility<S, ?> 
stateCompatibility =
+                               
stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
+                       if (stateCompatibility.isIncompatible()) {
+                               throw new StateMigrationException("The new 
state serializer for operator state must not be incompatible.");
                        }
+
+                       partitionableListState.setStateMetaInfo(
+                               new 
RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, 
mode));
                }
 
                accessedStatesByName.put(name, partitionableListState);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index ab6e8b1a046..585490dc75f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -23,7 +23,6 @@
 import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -63,9 +62,6 @@
        /** This specifies if we use a compressed format write the key-groups */
        private boolean usingKeyGroupCompression;
 
-       /** This specifies whether or not to use dummy {@link 
UnloadableDummyTypeSerializer} when serializers cannot be read. */
-       private boolean isSerializerPresenceRequired;
-
        // TODO the keySerializer field should be removed, once all serializers 
have the restoreSerializer() method implemented
        private TypeSerializer<K> keySerializer;
        private TypeSerializerSnapshot<K> keySerializerConfigSnapshot;
@@ -74,9 +70,8 @@
 
        private ClassLoader userCodeClassLoader;
 
-       public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader, 
boolean isSerializerPresenceRequired) {
+       public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
-               this.isSerializerPresenceRequired = 
isSerializerPresenceRequired;
        }
 
        public KeyedBackendSerializationProxy(
@@ -98,10 +93,6 @@ public KeyedBackendSerializationProxy(
                return stateMetaInfoSnapshots;
        }
 
-       public TypeSerializer<K> restoreKeySerializer() {
-               return keySerializerConfigSnapshot.restoreSerializer();
-       }
-
        public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() {
                return keySerializerConfigSnapshot;
        }
@@ -163,10 +154,6 @@ public void read(DataInputView in) throws IOException {
                }
                this.keySerializer = null;
 
-               if (isSerializerPresenceRequired) {
-                       
checkSerializerPresence(this.keySerializerConfigSnapshot.restoreSerializer());
-               }
-
                Integer metaInfoSnapshotVersion = 
META_INFO_SNAPSHOT_FORMAT_VERSION_MAPPER.get(readVersion);
                if (metaInfoSnapshotVersion == null) {
                        // this should not happen; guard for the future
@@ -181,22 +168,7 @@ public void read(DataInputView in) throws IOException {
                for (int i = 0; i < numKvStates; i++) {
                        StateMetaInfoSnapshot snapshot = 
stateMetaInfoReader.readStateMetaInfoSnapshot(in, userCodeClassLoader);
 
-                       if (isSerializerPresenceRequired) {
-                               checkSerializerPresence(
-                                       
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
-                               checkSerializerPresence(
-                                       
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
-                       }
                        stateMetaInfoSnapshots.add(snapshot);
                }
        }
-
-       private void checkSerializerPresence(TypeSerializer<?> serializer) 
throws IOException {
-               if (serializer instanceof UnloadableDummyTypeSerializer) {
-                       throw new IOException("Unable to restore keyed state, 
because a previous serializer" +
-                               " of the keyed state is not present The 
serializer could have been removed from the classpath, " +
-                               " or its implementation have changed and could 
not be loaded. This is a temporary restriction that will" +
-                               " be fixed in future versions.");
-               }
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
index 7f95ed70326..a92527a0371 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
@@ -191,23 +189,25 @@ public int hashCode() {
                }
 
                // check compatibility results to determine if state migration 
is required
-               CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                       
restoredStateMetaInfoSnapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-                       null,
-                       
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER),
-                       newNamespaceSerializer);
+               @SuppressWarnings("unchecked")
+               TypeSerializerSnapshot<N> namespaceSerializerSnapshot = 
Preconditions.checkNotNull(
+                       (TypeSerializerSnapshot<N>) 
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+                               
StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
+
+               TypeSerializerSchemaCompatibility<N, ?> namespaceCompatibility =
+                       
namespaceSerializerSnapshot.resolveSchemaCompatibility(newNamespaceSerializer);
 
                TypeSerializer<S> newStateSerializer = 
newStateDescriptor.getSerializer();
-               CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                       restoredStateMetaInfoSnapshot.restoreTypeSerializer(
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-                       UnloadableDummyTypeSerializer.class,
-                       
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
-                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER),
-                       newStateSerializer);
-
-               if (namespaceCompatibility.isRequiresMigration() || 
stateCompatibility.isRequiresMigration()) {
+
+               @SuppressWarnings("unchecked")
+               TypeSerializerSnapshot<S> stateSerializerSnapshot = 
Preconditions.checkNotNull(
+                       (TypeSerializerSnapshot<S>) 
restoredStateMetaInfoSnapshot.getTypeSerializerConfigSnapshot(
+                               
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
+
+               TypeSerializerSchemaCompatibility<S, ?> stateCompatibility =
+                       
stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer);
+
+               if (!namespaceCompatibility.isCompatibleAsIs() || 
!stateCompatibility.isCompatibleAsIs()) {
                        // TODO state migration currently isn't possible.
                        throw StateMigrationException.notSupported();
                } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
index b7dff59aef0..4132d144a4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredStateMetaInfoBase.java
@@ -42,21 +42,4 @@ public String getName() {
 
        @Nonnull
        public abstract StateMetaInfoSnapshot snapshot();
-
-       public static RegisteredStateMetaInfoBase fromMetaInfoSnapshot(@Nonnull 
StateMetaInfoSnapshot snapshot) {
-
-               final StateMetaInfoSnapshot.BackendStateType backendStateType = 
snapshot.getBackendStateType();
-               switch (backendStateType) {
-                       case KEY_VALUE:
-                               return new 
RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
-                       case OPERATOR:
-                               return new 
RegisteredOperatorStateBackendMetaInfo<>(snapshot);
-                       case BROADCAST:
-                               return new 
RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
-                       case PRIORITY_QUEUE:
-                               return new 
RegisteredPriorityQueueStateBackendMetaInfo<>(snapshot);
-                       default:
-                               throw new IllegalArgumentException("Unknown 
backend state type: " + backendStateType);
-               }
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 837e51fafc0..6ade53caf10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -28,10 +28,9 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -206,14 +205,15 @@ public HeapKeyedStateBackend(
                        StateMetaInfoSnapshot.CommonSerializerKeys 
serializerKey =
                                
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
 
-                       CompatibilityResult<T> compatibilityResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                               
restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey),
-                               null,
-                               
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
-                               byteOrderedElementSerializer);
+                       @SuppressWarnings("unchecked")
+                       TypeSerializerSnapshot<T> serializerSnapshot = 
Preconditions.checkNotNull(
+                               (TypeSerializerSnapshot<T>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+
+                       TypeSerializerSchemaCompatibility<T, ?> 
compatibilityResult =
+                               
serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
 
-                       if (compatibilityResult.isRequiresMigration()) {
-                               throw new 
FlinkRuntimeException(StateMigrationException.notSupported());
+                       if (compatibilityResult.isIncompatible()) {
+                               throw new FlinkRuntimeException(new 
StateMigrationException("For heap backends, the new priority queue serializer 
must not be incompatible."));
                        } else {
                                registeredPQStates.put(
                                        stateName,
@@ -405,26 +405,17 @@ private void 
restorePartitionedState(Collection<KeyedStateHandle> state) throws
                        try {
                                DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(fsDataInputStream);
 
-                               // isSerializerPresenceRequired flag is set to 
true, since for the heap state backend,
-                               // deserialization of state happens eagerly at 
restore time
                                KeyedBackendSerializationProxy<K> 
serializationProxy =
-                                               new 
KeyedBackendSerializationProxy<>(userCodeClassLoader, true);
+                                               new 
KeyedBackendSerializationProxy<>(userCodeClassLoader);
 
                                serializationProxy.read(inView);
 
                                if (!keySerializerRestored) {
                                        // check for key serializer 
compatibility; this also reconfigures the
                                        // key serializer to be compatible, if 
it is required and is possible
-                                       if 
(CompatibilityUtil.resolveCompatibilityResult(
-                                                       
serializationProxy.restoreKeySerializer(),
-                                                       
UnloadableDummyTypeSerializer.class,
-                                                       
serializationProxy.getKeySerializerConfigSnapshot(),
-                                                       keySerializer)
-                                               .isRequiresMigration()) {
-
-                                               // TODO replace with state 
migration; note that key hash codes need to remain the same after migration
-                                               throw new 
StateMigrationException("The new key serializer is not compatible to read 
previous keys. " +
-                                                       "Aborting now since 
state migration is currently not available");
+                                       if 
(!serializationProxy.getKeySerializerConfigSnapshot()
+                                                       
.resolveSchemaCompatibility(keySerializer).isCompatibleAsIs()) {
+                                               throw new 
StateMigrationException("The new key serializer must be compatible.");
                                        }
 
                                        keySerializerRestored = true;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 215d7d36c96..d5310412385 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -32,13 +32,13 @@
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.concurrent.RunnableFuture;
@@ -141,8 +141,8 @@ public void 
testOperatorStateRestoreFailsIfSerializerDeserializationFails() thro
                        
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
                        fail("The operator state restore should have failed if 
the previous state serializer could not be loaded.");
-               } catch (IOException expected) {
-                       
Assert.assertTrue(expected.getMessage().contains("Unable to restore operator 
state"));
+               } catch (Exception expected) {
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
ClassNotFoundException.class).isPresent());
                } finally {
                        stateHandle.discardState();
                }
@@ -194,8 +194,8 @@ public void 
testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws
                                                
Collections.singleton(StringSerializer.class.getName()))));
 
                        fail("The keyed state restore should have failed if the 
previous state serializer could not be loaded.");
-               } catch (IOException expected) {
-                       
Assert.assertTrue(expected.getMessage().contains("Unable to restore keyed 
state"));
+               } catch (Exception expected) {
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
ClassNotFoundException.class).isPresent());
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index ab09557691a..4976b302e83 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
 
@@ -889,8 +890,8 @@ public void 
testRestoreFailsIfSerializerDeserializationFails() throws Exception
                        
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
                        fail("The operator state restore should have failed if 
the previous state serializer could not be loaded.");
-               } catch (IOException expected) {
-                       
Assert.assertTrue(expected.getMessage().contains("Unable to restore operator 
state"));
+               } catch (Exception expected) {
+                       
Assert.assertTrue(ExceptionUtils.findThrowable(expected, 
ClassNotFoundException.class).isPresent());
                } finally {
                        stateHandle.discardState();
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 97665518d38..7858b5c74fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -34,6 +33,7 @@
 import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 
+import org.apache.flink.util.ExceptionUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -72,14 +72,13 @@ public void testKeyedBackendSerializationProxyRoundtrip() 
throws Exception {
                }
 
                serializationProxy =
-                               new 
KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader(),
 true);
+                               new 
KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
                }
 
                
Assert.assertTrue(serializationProxy.isUsingKeyGroupCompression());
-               Assert.assertEquals(keySerializer, 
serializationProxy.restoreKeySerializer());
                Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
                assertEqualStateMetaInfoSnapshotsLists(stateMetaInfoList, 
serializationProxy.getStateMetaInfoSnapshots());
        }
@@ -120,21 +119,24 @@ public void 
testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
                        new KeyedBackendSerializationProxy<>(
                                new ArtificialCNFExceptionThrowingClassLoader(
                                        
Thread.currentThread().getContextClassLoader(),
-                                       cnfThrowingSerializerClasses),
-                               false);
+                                       cnfThrowingSerializerClasses));
 
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
                }
 
                Assert.assertEquals(true, 
serializationProxy.isUsingKeyGroupCompression());
-               Assert.assertTrue(serializationProxy.restoreKeySerializer() 
instanceof UnloadableDummyTypeSerializer);
                Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
 
                for (StateMetaInfoSnapshot snapshot : 
serializationProxy.getStateMetaInfoSnapshots()) {
-                       final RegisteredKeyValueStateBackendMetaInfo<?, ?> 
restoredMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
-                       
Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() instanceof 
UnloadableDummyTypeSerializer);
-                       Assert.assertTrue(restoredMetaInfo.getStateSerializer() 
instanceof UnloadableDummyTypeSerializer);
+                       try {
+                               // creating a registered meta info from the 
snapshot would fail, because the serializer snapshots
+                               // cannot create a proper restore serializer
+                               new 
RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+                       } catch (Exception e) {
+                               
Assert.assertTrue(ExceptionUtils.findThrowable(e, 
ClassNotFoundException.class).isPresent());
+                       }
+
                        
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
                        
Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
                }
@@ -168,7 +170,7 @@ public void testKeyedStateMetaInfoSerialization() throws 
Exception {
        }
 
        @Test
-       public void testKeyedStateMetaInfoReadSerializerFailureResilience() 
throws Exception {
+       public void 
testKeyedStateMetaInfoReadWithSerializerSerializationFailure() throws Exception 
{
                String name = "test";
                TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
@@ -198,11 +200,15 @@ public void 
testKeyedStateMetaInfoReadSerializerFailureResilience() throws Excep
                                new DataInputViewStreamWrapper(in), 
classLoader);
                }
 
-               RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredMetaInfo = 
new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+               try {
+                       // creating a registered meta info from the snapshot 
would fail, because the serializer snapshots
+                       // cannot create a proper restore serializer
+                       new RegisteredKeyValueStateBackendMetaInfo<>(snapshot);
+               } catch (Exception e) {
+                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
ClassNotFoundException.class).isPresent());
+               }
 
-               Assert.assertEquals(name, restoredMetaInfo.getName());
-               Assert.assertTrue(restoredMetaInfo.getNamespaceSerializer() 
instanceof UnloadableDummyTypeSerializer);
-               Assert.assertTrue(restoredMetaInfo.getStateSerializer() 
instanceof UnloadableDummyTypeSerializer);
+               Assert.assertEquals(name, snapshot.getName());
                
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER));
                Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
        }
@@ -320,7 +326,7 @@ public void testBroadcastStateMetaInfoSerialization() 
throws Exception {
        }
 
        @Test
-       public void testOperatorStateMetaInfoReadSerializerFailureResilience() 
throws Exception {
+       public void 
testOperatorStateMetaInfoReadWithSerializerSerializationFailure() throws 
Exception {
                String name = "test";
                TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
 
@@ -348,18 +354,22 @@ public void 
testOperatorStateMetaInfoReadSerializerFailureResilience() throws Ex
                        snapshot = reader.readStateMetaInfoSnapshot(new 
DataInputViewStreamWrapper(in), classLoader);
                }
 
-               RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
+               try {
+                       // creating a registered meta info from the snapshot 
would fail, because the serializer snapshots
+                       // cannot create a proper restore serializer
                        new RegisteredOperatorStateBackendMetaInfo<>(snapshot);
+               } catch (Exception e) {
+                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
ClassNotFoundException.class).isPresent());
+               }
 
-               Assert.assertEquals(name, restoredMetaInfo.getName());
-               
Assert.assertTrue(restoredMetaInfo.getPartitionStateSerializer() instanceof 
UnloadableDummyTypeSerializer);
+               Assert.assertEquals(name, snapshot.getName());
                Assert.assertEquals(
                        stateSerializer.snapshotConfiguration(),
                        
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
        }
 
        @Test
-       public void testBroadcastStateMetaInfoReadSerializerFailureResilience() 
throws Exception {
+       public void 
testBroadcastStateMetaInfoReadWithSerializerSerializationFailure() throws 
Exception {
                String broadcastName = "broadcastTest";
                TypeSerializer<?> keySerializer = DoubleSerializer.INSTANCE;
                TypeSerializer<?> valueSerializer = StringSerializer.INSTANCE;
@@ -393,14 +403,17 @@ public void 
testBroadcastStateMetaInfoReadSerializerFailureResilience() throws E
                        snapshot = reader.readStateMetaInfoSnapshot(new 
DataInputViewStreamWrapper(in), classLoader);
                }
 
-               RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
+               try {
+                       // creating a registered meta info from the snapshot 
would fail, because the serializer snapshots
+                       // cannot create a proper restore serializer
                        new RegisteredBroadcastStateBackendMetaInfo<>(snapshot);
+               } catch (Exception e) {
+                       Assert.assertTrue(ExceptionUtils.findThrowable(e, 
ClassNotFoundException.class).isPresent());
+               }
 
-               Assert.assertEquals(broadcastName, restoredMetaInfo.getName());
-               Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST, 
restoredMetaInfo.getAssignmentMode());
-               Assert.assertTrue(restoredMetaInfo.getKeySerializer() 
instanceof UnloadableDummyTypeSerializer);
+               Assert.assertEquals(broadcastName, snapshot.getName());
+               
Assert.assertEquals(OperatorStateHandle.Mode.BROADCAST.toString(), 
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE));
                Assert.assertEquals(keySerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
-               Assert.assertTrue(restoredMetaInfo.getValueSerializer() 
instanceof UnloadableDummyTypeSerializer);
                Assert.assertEquals(valueSerializer.snapshotConfiguration(), 
snapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
        }
 
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
index 3b8331abdf6..5990857c3cc 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala
@@ -21,13 +21,13 @@ package org.apache.flink.api.scala.runtime
 import java.io.InputStream
 
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
+import org.apache.flink.api.common.typeutils.{TypeSerializer, 
TypeSerializerSerializationUtil, TypeSerializerSnapshot}
 import 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot
 import org.apache.flink.api.scala.createTypeInformation
 import 
org.apache.flink.api.scala.runtime.TupleSerializerCompatibilityTestGenerator._
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.core.memory.DataInputViewStreamWrapper
-import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
+import org.junit.Assert.{assertEquals, assertNotNull, assertTrue}
 import org.junit.Test
 
 /**
@@ -48,8 +48,11 @@ class TupleSerializerCompatibilityTest {
 
       assertEquals(1, deserialized.size)
 
-      val oldSerializer = deserialized.get(0).f0
-      val oldConfigSnapshot = deserialized.get(0).f1
+      val oldSerializer: TypeSerializer[TestCaseClass] =
+        deserialized.get(0).f0.asInstanceOf[TypeSerializer[TestCaseClass]]
+
+      val oldConfigSnapshot: TypeSerializerSnapshot[TestCaseClass] =
+        
deserialized.get(0).f1.asInstanceOf[TypeSerializerSnapshot[TestCaseClass]]
 
       // test serializer and config snapshot
       assertNotNull(oldSerializer)
@@ -61,9 +64,9 @@ class TupleSerializerCompatibilityTest {
 
       val currentSerializer = createTypeInformation[TestCaseClass]
         .createSerializer(new ExecutionConfig())
-      assertFalse(currentSerializer
-        .ensureCompatibility(oldConfigSnapshot)
-        .isRequiresMigration)
+      assertTrue(oldConfigSnapshot
+        .resolveSchemaCompatibility(currentSerializer)
+        .isCompatibleAsIs)
 
       // test old data serialization
       is.close()
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
index 55e2419a97a..f400dc33612 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.scala.typeutils
 import java.io._
 import java.net.{URL, URLClassLoader}
 
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, 
TypeSerializerSnapshotSerializationUtil}
+import 
org.apache.flink.api.common.typeutils.{TypeSerializerSchemaCompatibility, 
TypeSerializerSnapshotSerializationUtil}
 import org.apache.flink.core.memory.{DataInputViewStreamWrapper, 
DataOutputViewStreamWrapper}
 import org.apache.flink.util.TestLogger
 import org.junit.rules.TemporaryFolder
@@ -84,7 +84,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with 
JUnitSuiteLike {
     */
   @Test
   def checkIdenticalEnums(): Unit = {
-    assertFalse(checkCompatibility(enumA, enumA).isRequiresMigration)
+    assertTrue(checkCompatibility(enumA, enumA).isCompatibleAsIs)
   }
 
   /**
@@ -92,7 +92,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with 
JUnitSuiteLike {
     */
   @Test
   def checkAppendedField(): Unit = {
-    assertFalse(checkCompatibility(enumA, enumB).isRequiresMigration)
+    assertTrue(checkCompatibility(enumA, enumB).isCompatibleAsIs)
   }
 
   /**
@@ -100,7 +100,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger 
with JUnitSuiteLike {
     */
   @Test
   def checkRemovedField(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumC).isRequiresMigration)
+    assertTrue(checkCompatibility(enumA, enumC).isIncompatible)
   }
 
   /**
@@ -108,7 +108,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger 
with JUnitSuiteLike {
     */
   @Test
   def checkDifferentFieldOrder(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumD).isRequiresMigration)
+    assertTrue(checkCompatibility(enumA, enumD).isIncompatible)
   }
 
   /**
@@ -117,12 +117,12 @@ class EnumValueSerializerUpgradeTest extends TestLogger 
with JUnitSuiteLike {
   @Test
   def checkDifferentIds(): Unit = {
     assertTrue(
-      "Different ids should cause a migration.",
-      checkCompatibility(enumA, enumE).isRequiresMigration)
+      "Different ids should be incompatible.",
+      checkCompatibility(enumA, enumE).isIncompatible)
   }
 
   def checkCompatibility(enumSourceA: String, enumSourceB: String)
-    : CompatibilityResult[Enumeration#Value] = {
+    : TypeSerializerSchemaCompatibility[Enumeration#Value, _] = {
     import EnumValueSerializerUpgradeTest._
 
     val classLoader = compileAndLoadEnum(tempFolder.newFolder(), 
s"$enumName.scala", enumSourceA)
@@ -152,7 +152,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger 
with JUnitSuiteLike {
     val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
 
     val enumValueSerializer2 = new EnumValueSerializer(enum2)
-    enumValueSerializer2.ensureCompatibility(snapshot2)
+    snapshot2.resolveSchemaCompatibility(enumValueSerializer2)
   }
 }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a4b4ce80dc2..885d582bbc8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -27,10 +27,9 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
@@ -724,22 +723,15 @@ private void restoreKVStateMetaData() throws IOException, 
StateMigrationExceptio
                        // that the new serializer for states could be 
compatible, and therefore the restore can continue
                        // without old serializers required to be present.
                        KeyedBackendSerializationProxy<K> serializationProxy =
-                               new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, 
false);
+                               new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
 
                        serializationProxy.read(currentStateHandleInView);
 
                        // check for key serializer compatibility; this also 
reconfigures the
                        // key serializer to be compatible, if it is required 
and is possible
-                       if (CompatibilityUtil.resolveCompatibilityResult(
-                               serializationProxy.restoreKeySerializer(),
-                               UnloadableDummyTypeSerializer.class,
-                               
serializationProxy.getKeySerializerConfigSnapshot(),
-                               rocksDBKeyedStateBackend.keySerializer)
-                               .isRequiresMigration()) {
-
-                               // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
-                               throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
-                                       "Aborting now since state migration is 
currently not available");
+                       if (!serializationProxy.getKeySerializerConfigSnapshot()
+                                       
.resolveSchemaCompatibility(rocksDBKeyedStateBackend.keySerializer).isCompatibleAsIs())
 {
+                               throw new StateMigrationException("The new key 
serializer must be compatible.");
                        }
 
                        this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
@@ -761,15 +753,12 @@ private void restoreKVStateMetaData() throws IOException, 
StateMigrationExceptio
                                                nameBytes,
                                                
rocksDBKeyedStateBackend.columnOptions);
 
-                                       RegisteredStateMetaInfoBase 
stateMetaInfo =
-                                               
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
-
                                        
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
 
                                        ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
-                                       registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
-                                       
rocksDBKeyedStateBackend.registerKvStateInformation(stateMetaInfo.getName(), 
registeredColumn);
+                                       registeredColumn = new 
Tuple2<>(columnFamily, null);
+                                       
rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(), 
registeredColumn);
 
                                } else {
                                        // TODO with eager state registration 
in place, check here for serializer migration strategies
@@ -1079,13 +1068,10 @@ private ColumnFamilyHandle 
getOrRegisterColumnFamilyHandle(
                                
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
 
                        if (null == registeredStateMetaInfoEntry) {
-                               RegisteredStateMetaInfoBase stateMetaInfo =
-                                       
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
-
                                registeredStateMetaInfoEntry =
                                        new Tuple2<>(
                                                columnFamilyHandle != null ? 
columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
-                                               stateMetaInfo);
+                                               null);
 
                                stateBackend.registerKvStateInformation(
                                        stateMetaInfoSnapshot.getName(),
@@ -1213,12 +1199,10 @@ private void restoreLocalStateIntoFullInstance(
                                StateMetaInfoSnapshot stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
 
                                ColumnFamilyHandle columnFamilyHandle = 
columnFamilyHandles.get(i);
-                               RegisteredStateMetaInfoBase stateMetaInfo =
-                                       
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
 
                                stateBackend.registerKvStateInformation(
                                        stateMetaInfoSnapshot.getName(),
-                                       new Tuple2<>(columnFamilyHandle, 
stateMetaInfo));
+                                       new Tuple2<>(columnFamilyHandle, null));
                        }
 
                        // use the restore sst files as the base for succeeding 
checkpoints
@@ -1275,22 +1259,15 @@ private void restoreInstanceDirectoryFromPath(Path 
source) throws IOException {
                                // that the new serializer for states could be 
compatible, and therefore the restore can continue
                                // without old serializers required to be 
present.
                                KeyedBackendSerializationProxy<T> 
serializationProxy =
-                                       new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader, false);
+                                       new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
                                DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
                                serializationProxy.read(in);
 
                                // check for key serializer compatibility; this 
also reconfigures the
                                // key serializer to be compatible, if it is 
required and is possible
-                               if 
(CompatibilityUtil.resolveCompatibilityResult(
-                                       
serializationProxy.restoreKeySerializer(),
-                                       UnloadableDummyTypeSerializer.class,
-                                       
serializationProxy.getKeySerializerConfigSnapshot(),
-                                       stateBackend.keySerializer)
-                                       .isRequiresMigration()) {
-
-                                       // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
-                                       throw new StateMigrationException("The 
new key serializer is not compatible to read previous keys. " +
-                                               "Aborting now since state 
migration is currently not available");
+                               if 
(!serializationProxy.getKeySerializerConfigSnapshot()
+                                               
.resolveSchemaCompatibility(stateBackend.keySerializer).isCompatibleAsIs()) {
+                                       throw new StateMigrationException("The 
new key serializer must be compatible.");
                                }
 
                                return 
serializationProxy.getStateMetaInfoSnapshots();
@@ -1622,13 +1599,15 @@ public static RocksIteratorWrapper getRocksIterator(
                        TypeSerializer<?> metaInfoTypeSerializer = 
restoredMetaInfoSnapshot.restoreTypeSerializer(serializerKey);
 
                        if (metaInfoTypeSerializer != 
byteOrderedElementSerializer) {
-                               CompatibilityResult<T> compatibilityResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       metaInfoTypeSerializer,
-                                       null,
-                                       
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey),
-                                       byteOrderedElementSerializer);
+                               @SuppressWarnings("unchecked")
+                               TypeSerializerSnapshot<T> serializerSnapshot = 
Preconditions.checkNotNull(
+                                       (TypeSerializerSnapshot<T>) 
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
+
+                               TypeSerializerSchemaCompatibility<T, ?> 
compatibilityResult =
+                                       
serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
 
-                               if (compatibilityResult.isRequiresMigration()) {
+                               // TODO implement proper migration for priority 
queue state
+                               if (compatibilityResult.isIncompatible()) {
                                        throw new 
FlinkRuntimeException(StateMigrationException.notSupported());
                                }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
index 872a58d2304..f813039ae35 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/migration/TypeSerializerSnapshotMigrationITCase.java
@@ -290,12 +290,12 @@ public int getCurrentVersion() {
                }
 
                @Override
-               public void write(DataOutputView out) throws IOException {
+               public void writeSnapshot(DataOutputView out) throws 
IOException {
                        out.writeUTF(configPayload);
                }
 
                @Override
-               public void read(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
+               public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader) throws IOException {
                        if (readVersion != 1) {
                                throw new IllegalStateException("Can not 
recognize read version: " + readVersion);
                        }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to