dawidwys commented on code in PR #25357: URL: https://github.com/apache/flink/pull/25357#discussion_r1765312051
########## flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java: ########## @@ -146,144 +144,93 @@ protected Class<?> resolveProxyClass(String[] interfaces) } /** - * This is maintained as a temporary workaround for FLINK-6869. - * - * <p>Before 1.3, the Scala serializers did not specify the serialVersionUID. Although since 1.3 - * they are properly specified, we still have to ignore them for now as their previous - * serialVersionUIDs will vary depending on the Scala version. - * - * <p>This can be removed once 1.2 is no longer supported. + * Workaround for bugs like e.g. FLINK-36318 where we serialize a class into a snapshot and then + * its serialVersionUID is changed in an uncontrolled way. This lets us deserialize the old + * snapshot assuming the binary representation of the faulty class has not changed. */ - private static final Set<String> scalaSerializerClassnames = new HashSet<>(); + private static final class VersionMismatchHandler { - static { - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); - } + private final Map<String, Map<Long, List<Long>>> supportedSerialVersionUidsPerClass = + new HashMap<>(); - /** - * The serialVersionUID might change between Scala versions and since those classes are part of - * the tuple serializer config snapshots we need to ignore them. - * - * @see <a href="https://issues.apache.org/jira/browse/FLINK-8451">FLINK-8451</a> - */ - private static final Set<String> scalaTypes = new HashSet<>(); + void addVersionsMatch( + String className, long localVersionUID, List<Long> streamVersionUIDs) { + supportedSerialVersionUidsPerClass + .computeIfAbsent(className, k -> new HashMap<>()) + .put(localVersionUID, streamVersionUIDs); + } - static { - scalaTypes.add("scala.Tuple1"); - scalaTypes.add("scala.Tuple2"); - scalaTypes.add("scala.Tuple3"); - scalaTypes.add("scala.Tuple4"); - scalaTypes.add("scala.Tuple5"); - scalaTypes.add("scala.Tuple6"); - scalaTypes.add("scala.Tuple7"); - scalaTypes.add("scala.Tuple8"); - scalaTypes.add("scala.Tuple9"); - scalaTypes.add("scala.Tuple10"); - scalaTypes.add("scala.Tuple11"); - scalaTypes.add("scala.Tuple12"); - scalaTypes.add("scala.Tuple13"); - scalaTypes.add("scala.Tuple14"); - scalaTypes.add("scala.Tuple15"); - scalaTypes.add("scala.Tuple16"); - scalaTypes.add("scala.Tuple17"); - scalaTypes.add("scala.Tuple18"); - scalaTypes.add("scala.Tuple19"); - scalaTypes.add("scala.Tuple20"); - scalaTypes.add("scala.Tuple21"); - scalaTypes.add("scala.Tuple22"); - scalaTypes.add("scala.Tuple1$mcJ$sp"); - scalaTypes.add("scala.Tuple1$mcI$sp"); - scalaTypes.add("scala.Tuple1$mcD$sp"); - scalaTypes.add("scala.Tuple2$mcJJ$sp"); - scalaTypes.add("scala.Tuple2$mcJI$sp"); - scalaTypes.add("scala.Tuple2$mcJD$sp"); - scalaTypes.add("scala.Tuple2$mcIJ$sp"); - scalaTypes.add("scala.Tuple2$mcII$sp"); - scalaTypes.add("scala.Tuple2$mcID$sp"); - scalaTypes.add("scala.Tuple2$mcDJ$sp"); - scalaTypes.add("scala.Tuple2$mcDI$sp"); - scalaTypes.add("scala.Tuple2$mcDD$sp"); - scalaTypes.add("scala.Enumeration$ValueSet"); - } - - private static boolean isAnonymousClass(Class clazz) { - final String name = clazz.getName(); - - // isAnonymousClass does not work for anonymous Scala classes; additionally check by class - // name - if (name.contains("$anon$") || name.contains("$anonfun") || name.contains("$macro$")) { - return true; - } - - // calling isAnonymousClass or getSimpleName can throw InternalError for certain Scala - // types, see https://issues.scala-lang.org/browse/SI-2034 - // until we move to JDK 9, this try-catch is necessary - try { - return clazz.isAnonymousClass(); - } catch (InternalError e) { - return false; + /** + * Checks if the local version of the given class can safely deserialize the class of a + * different version from the object stream. + */ + boolean shouldTolerateSerialVersionMismatch( + String className, long localVersionUID, long streamVersionUID) { + return supportedSerialVersionUidsPerClass + .getOrDefault(className, Collections.emptyMap()) + .getOrDefault(localVersionUID, Collections.emptyList()) + .contains(streamVersionUID); + } + + /** + * Checks if there are any rules for the given class. This lets us decide early if we need + * to look up the local class. + */ + boolean haveRulesForClass(String className) { + return supportedSerialVersionUidsPerClass.containsKey(className); } } - private static boolean isOldAvroSerializer(String name, long serialVersionUID) { - // please see FLINK-11436 for details on why we need to ignore serial version UID here for - // the AvroSerializer - return (serialVersionUID == 1) - && "org.apache.flink.formats.avro.typeutils.AvroSerializer".equals(name); + private static final VersionMismatchHandler versionMismatchHandler = + new VersionMismatchHandler(); + + static { + // See FLINK-36318 + versionMismatchHandler.addVersionsMatch( + "org.apache.flink.table.runtime.typeutils.MapDataSerializer", + 4073842523628732956L, + Collections.singletonList(2533002123505507000L)); } /** - * A mapping between the full path of a deprecated serializer and its equivalent. These mappings - * are hardcoded and fixed. - * - * <p>IMPORTANT: mappings can be removed after 1 release as there will be a "migration path". As - * an example, a serializer is removed in 1.5-SNAPSHOT, then the mapping should be added for - * 1.5, and it can be removed in 1.6, as the path would be Flink-{< 1.5} -> Flink-1.5 -> - * Flink-{>= 1.6}. + * An {@link ObjectInputStream} that ignores certain serialVersionUID mismatches. This is a + * workaround for uncontrolled serialVersionUIDs changes. */ - private enum MigrationUtil { Review Comment: Those were unused. It was missed during previous clean ups. ########## flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java: ########## @@ -146,144 +144,93 @@ protected Class<?> resolveProxyClass(String[] interfaces) } /** - * This is maintained as a temporary workaround for FLINK-6869. - * - * <p>Before 1.3, the Scala serializers did not specify the serialVersionUID. Although since 1.3 - * they are properly specified, we still have to ignore them for now as their previous - * serialVersionUIDs will vary depending on the Scala version. - * - * <p>This can be removed once 1.2 is no longer supported. + * Workaround for bugs like e.g. FLINK-36318 where we serialize a class into a snapshot and then + * its serialVersionUID is changed in an uncontrolled way. This lets us deserialize the old + * snapshot assuming the binary representation of the faulty class has not changed. */ - private static final Set<String> scalaSerializerClassnames = new HashSet<>(); + private static final class VersionMismatchHandler { - static { - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); - } + private final Map<String, Map<Long, List<Long>>> supportedSerialVersionUidsPerClass = + new HashMap<>(); - /** - * The serialVersionUID might change between Scala versions and since those classes are part of - * the tuple serializer config snapshots we need to ignore them. - * - * @see <a href="https://issues.apache.org/jira/browse/FLINK-8451">FLINK-8451</a> - */ - private static final Set<String> scalaTypes = new HashSet<>(); + void addVersionsMatch( + String className, long localVersionUID, List<Long> streamVersionUIDs) { + supportedSerialVersionUidsPerClass + .computeIfAbsent(className, k -> new HashMap<>()) + .put(localVersionUID, streamVersionUIDs); + } - static { Review Comment: Those were unused. It was missed during previous clean ups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org