dawidwys commented on code in PR #21635: URL: https://github.com/apache/flink/pull/21635#discussion_r1072181884
########## docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ########## @@ -345,12 +345,15 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); } - @Override - protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) { - return (this.componentClass == newSerializer.getComponentClass()) - ? OuterSchemaCompatibility.COMPATIBLE_AS_IS - : OuterSchemaCompatibility.INCOMPATIBLE; - } + @Override Review Comment: please fix the indentation ########## docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ########## @@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is as follows: `TypeSerializerConfigSnapshot` implementation as will as the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer). +## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.16 + +This section is a guide for method migration from serializer snapshots that existed before Flink 1.16. Review Comment: ```suggestion This section is a guide for a method migration from the serializer snapshots that existed before Flink 1.17. ``` ########## docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ########## @@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is as follows: `TypeSerializerConfigSnapshot` implementation as will as the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer). +## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.16 Review Comment: ```suggestion ## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.17 ``` ########## docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ########## @@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is as follows: `TypeSerializerConfigSnapshot` implementation as will as the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer). +## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.16 + +This section is a guide for method migration from serializer snapshots that existed before Flink 1.16. + +Before Flink 1.16, when using a customized serializer to process data, the schema compatibility in the old serializer Review Comment: ```suggestion Before Flink 1.17, when using a customized serializer to process data, the schema compatibility in the old serializer ``` ########## docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ########## @@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is as follows: `TypeSerializerConfigSnapshot` implementation as will as the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer). +## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.16 + +This section is a guide for method migration from serializer snapshots that existed before Flink 1.16. + +Before Flink 1.16, when using a customized serializer to process data, the schema compatibility in the old serializer +(maybe in Flink library) has to meet the future need. +Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) of the old serializer has to be modified. +There are no ways to specify the compatibility with the old serializer in the new serializer, which also makes scheme evolution +not supported in some scenarios. + +So from Flink 1.17, the direction of resolving schema compatibility has been reversed. The old method +`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` has been marked as deprecated +and will be removed in the future. it is highly recommended to migrate from the old one to +`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. The steps to do this is as follows: Review Comment: ```suggestion `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. The steps to do this are as follows: ``` ########## flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java: ########## @@ -369,13 +417,13 @@ private void legacyInternalReadOuterSnapshot( } private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult( - TypeSerializer<?>[] newNestedSerializers, - TypeSerializerSnapshot<?>[] nestedSerializerSnapshots, + TypeSerializerSnapshot<?>[] newNestedSerializerSnapshot, Review Comment: `newNestedSerializerSnapshot` -> `newNestedSerializerSnapshots` ########## flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java: ########## @@ -290,26 +332,26 @@ protected void readOuterSnapshot( throws IOException {} /** - * Checks whether the outer snapshot is compatible with a given new serializer. + * Checks the schema compatibility of the given old serializer snapshot based on the outer + * snapshot. * - * <p>The base implementation of this method just returns {@code true}, i.e. it assumes that the - * outer serializer only has nested serializers and no extra information, and therefore the - * result of the check must always be true. Otherwise, if the outer serializer contains some + * <p>The base implementation of this method assumes that the outer serializer only has nested + * serializers and no extra information, and therefore the result of the check is {@link + * OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer serializer contains some * extra information that has been persisted as part of the serializer snapshot, this must be * overridden. Note that this method and the corresponding methods {@link * #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, * ClassLoader)} needs to be implemented. * - * @param newSerializer the new serializer, which contains the new outer information to check - * against. - * @return a flag indicating whether or not the new serializer's outer information is compatible - * with the one written in this snapshot. - * @deprecated this method is deprecated, and will be removed in the future. Please implement - * {@link #resolveOuterSchemaCompatibility(TypeSerializer)} instead. + * @param oldSerializerSnapshot the old serializer snapshot, which contains the old outer + * information to check against. + * @return a {@link OuterSchemaCompatibility} indicating whether or the new serializer's outer + * information is compatible, requires migration, or incompatible with the one written in + * this snapshot. */ - @Deprecated - protected boolean isOuterSnapshotCompatible(S newSerializer) { Review Comment: Do I see it correctly that you've removed this deprecated method? If so , please make sure this ends up in the release notes. ########## docs/content/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md: ########## @@ -446,4 +449,23 @@ migrate from the old abstractions. The steps to do this is as follows: `TypeSerializerConfigSnapshot` implementation as will as the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer). +## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.16 Review Comment: @alpinegizmo Do you mind taking a look at the docs part? ########## flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java: ########## @@ -76,10 +80,26 @@ protected void readOuterSnapshot( this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader); } + @Override + public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility( + TypeSerializerSnapshot<C[]> oldSerializerSnapshot) { + if (oldSerializerSnapshot instanceof GenericArraySerializerConfigSnapshot) { Review Comment: imo, there is something wrong with this branch. If the `oldSerializerSnapshot` is `GenericArraySerializerConfigSnapshot` we then call: `CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot` -> `CompositeTypeSerializerSnapshot#internalResolveSchemaCompatibility` -> `GenericArraySerializerSnapshot#resolveOuterSchemaCompatibility` where we try to cast the initial `GenericArraySerializerConfigSnapshot` to `GenericArraySerializerSnapshot` which will fail. Could you please verify that, preferably with a test? Or if it works point me to a test that checks that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org