dawidwys commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1072181884


##########
docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##########
@@ -345,12 +345,15 @@ public final class GenericArraySerializerSnapshot<C> 
extends CompositeTypeSerial
         this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
     }
 
-    @Override
-    protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer 
newSerializer) {
-        return (this.componentClass == newSerializer.getComponentClass())
-            ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
-            : OuterSchemaCompatibility.INCOMPATIBLE;
-    }
+       @Override

Review Comment:
   please fix the indentation



##########
docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##########
@@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is 
as follows:
  `TypeSerializerConfigSnapshot` implementation as will as the
  `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the 
serializer).
 
+## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.16
+
+This section is a guide for method migration from serializer snapshots that 
existed before Flink 1.16.

Review Comment:
   ```suggestion
   This section is a guide for a method migration from the serializer snapshots 
that existed before Flink 1.17.
   ```



##########
docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##########
@@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is 
as follows:
  `TypeSerializerConfigSnapshot` implementation as will as the
  `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the 
serializer).
 
+## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.16

Review Comment:
   ```suggestion
   ## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.17
   ```



##########
docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##########
@@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is 
as follows:
  `TypeSerializerConfigSnapshot` implementation as will as the
  `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the 
serializer).
 
+## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.16
+
+This section is a guide for method migration from serializer snapshots that 
existed before Flink 1.16.
+
+Before Flink 1.16, when using a customized serializer to process data, the 
schema compatibility in the old serializer 

Review Comment:
   ```suggestion
   Before Flink 1.17, when using a customized serializer to process data, the 
schema compatibility in the old serializer 
   ```



##########
docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##########
@@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is 
as follows:
  `TypeSerializerConfigSnapshot` implementation as will as the
  `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the 
serializer).
 
+## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.16
+
+This section is a guide for method migration from serializer snapshots that 
existed before Flink 1.16.
+
+Before Flink 1.16, when using a customized serializer to process data, the 
schema compatibility in the old serializer 
+(maybe in Flink library) has to meet the future need. 
+Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> 
newSerializer) of the old serializer has to be modified. 
+There are no ways to specify the compatibility with the old serializer in the 
new serializer, which also makes scheme evolution 
+not supported in some scenarios.
+
+So from Flink 1.17, the direction of resolving schema compatibility has been 
reversed. The old method 
+`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` has been marked as deprecated 
+and will be removed in the future. it is highly recommended to migrate from 
the old one to 
+`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot 
oldSerializerSnapshot)`. The steps to do this is as follows:

Review Comment:
   ```suggestion
   `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot 
oldSerializerSnapshot)`. The steps to do this are as follows:
   ```



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java:
##########
@@ -369,13 +417,13 @@ private void legacyInternalReadOuterSnapshot(
     }
 
     private TypeSerializerSchemaCompatibility<T> 
constructFinalSchemaCompatibilityResult(
-            TypeSerializer<?>[] newNestedSerializers,
-            TypeSerializerSnapshot<?>[] nestedSerializerSnapshots,
+            TypeSerializerSnapshot<?>[] newNestedSerializerSnapshot,

Review Comment:
   `newNestedSerializerSnapshot` -> `newNestedSerializerSnapshots`



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java:
##########
@@ -290,26 +332,26 @@ protected void readOuterSnapshot(
             throws IOException {}
 
     /**
-     * Checks whether the outer snapshot is compatible with a given new 
serializer.
+     * Checks the schema compatibility of the given old serializer snapshot 
based on the outer
+     * snapshot.
      *
-     * <p>The base implementation of this method just returns {@code true}, 
i.e. it assumes that the
-     * outer serializer only has nested serializers and no extra information, 
and therefore the
-     * result of the check must always be true. Otherwise, if the outer 
serializer contains some
+     * <p>The base implementation of this method assumes that the outer 
serializer only has nested
+     * serializers and no extra information, and therefore the result of the 
check is {@link
+     * OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer 
serializer contains some
      * extra information that has been persisted as part of the serializer 
snapshot, this must be
      * overridden. Note that this method and the corresponding methods {@link
      * #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, 
DataInputView,
      * ClassLoader)} needs to be implemented.
      *
-     * @param newSerializer the new serializer, which contains the new outer 
information to check
-     *     against.
-     * @return a flag indicating whether or not the new serializer's outer 
information is compatible
-     *     with the one written in this snapshot.
-     * @deprecated this method is deprecated, and will be removed in the 
future. Please implement
-     *     {@link #resolveOuterSchemaCompatibility(TypeSerializer)} instead.
+     * @param oldSerializerSnapshot the old serializer snapshot, which 
contains the old outer
+     *     information to check against.
+     * @return a {@link OuterSchemaCompatibility} indicating whether or the 
new serializer's outer
+     *     information is compatible, requires migration, or incompatible with 
the one written in
+     *     this snapshot.
      */
-    @Deprecated
-    protected boolean isOuterSnapshotCompatible(S newSerializer) {

Review Comment:
   Do I see it correctly that you've removed this deprecated method? If so , 
please make sure this ends up in the release notes.



##########
docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md:
##########
@@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is 
as follows:
  `TypeSerializerConfigSnapshot` implementation as will as the
  `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the 
serializer).
 
+## Migrating from deprecated 
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer 
newSerializer)` before Flink 1.16

Review Comment:
   @alpinegizmo Do you mind taking a look at the docs part?



##########
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java:
##########
@@ -76,10 +80,26 @@ protected void readOuterSnapshot(
         this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
     }
 
+    @Override
+    public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
+            TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
+        if (oldSerializerSnapshot instanceof 
GenericArraySerializerConfigSnapshot) {

Review Comment:
   imo, there is something wrong with this branch. If the 
`oldSerializerSnapshot` is `GenericArraySerializerConfigSnapshot` we then call:
   `CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot` -> 
`CompositeTypeSerializerSnapshot#internalResolveSchemaCompatibility` ->
   `GenericArraySerializerSnapshot#resolveOuterSchemaCompatibility` where we 
try to cast the initial
   `GenericArraySerializerConfigSnapshot` to `GenericArraySerializerSnapshot` 
which will fail.
   
   Could you please verify that, preferably with a test? Or if it works point 
me to a test that checks that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to