dawidwys commented on code in PR #25357:
URL: https://github.com/apache/flink/pull/25357#discussion_r1765312051


##########
flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java:
##########
@@ -146,144 +144,93 @@ protected Class<?> resolveProxyClass(String[] interfaces)
     }
 
     /**
-     * This is maintained as a temporary workaround for FLINK-6869.
-     *
-     * <p>Before 1.3, the Scala serializers did not specify the 
serialVersionUID. Although since 1.3
-     * they are properly specified, we still have to ignore them for now as 
their previous
-     * serialVersionUIDs will vary depending on the Scala version.
-     *
-     * <p>This can be removed once 1.2 is no longer supported.
+     * Workaround for bugs like e.g. FLINK-36318 where we serialize a class 
into a snapshot and then
+     * its serialVersionUID is changed in an uncontrolled way. This lets us 
deserialize the old
+     * snapshot assuming the binary representation of the faulty class has not 
changed.
      */
-    private static final Set<String> scalaSerializerClassnames = new 
HashSet<>();
+    private static final class VersionMismatchHandler {
 
-    static {
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
-    }
+        private final Map<String, Map<Long, List<Long>>> 
supportedSerialVersionUidsPerClass =
+                new HashMap<>();
 
-    /**
-     * The serialVersionUID might change between Scala versions and since 
those classes are part of
-     * the tuple serializer config snapshots we need to ignore them.
-     *
-     * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451</a>
-     */
-    private static final Set<String> scalaTypes = new HashSet<>();
+        void addVersionsMatch(
+                String className, long localVersionUID, List<Long> 
streamVersionUIDs) {
+            supportedSerialVersionUidsPerClass
+                    .computeIfAbsent(className, k -> new HashMap<>())
+                    .put(localVersionUID, streamVersionUIDs);
+        }
 
-    static {
-        scalaTypes.add("scala.Tuple1");
-        scalaTypes.add("scala.Tuple2");
-        scalaTypes.add("scala.Tuple3");
-        scalaTypes.add("scala.Tuple4");
-        scalaTypes.add("scala.Tuple5");
-        scalaTypes.add("scala.Tuple6");
-        scalaTypes.add("scala.Tuple7");
-        scalaTypes.add("scala.Tuple8");
-        scalaTypes.add("scala.Tuple9");
-        scalaTypes.add("scala.Tuple10");
-        scalaTypes.add("scala.Tuple11");
-        scalaTypes.add("scala.Tuple12");
-        scalaTypes.add("scala.Tuple13");
-        scalaTypes.add("scala.Tuple14");
-        scalaTypes.add("scala.Tuple15");
-        scalaTypes.add("scala.Tuple16");
-        scalaTypes.add("scala.Tuple17");
-        scalaTypes.add("scala.Tuple18");
-        scalaTypes.add("scala.Tuple19");
-        scalaTypes.add("scala.Tuple20");
-        scalaTypes.add("scala.Tuple21");
-        scalaTypes.add("scala.Tuple22");
-        scalaTypes.add("scala.Tuple1$mcJ$sp");
-        scalaTypes.add("scala.Tuple1$mcI$sp");
-        scalaTypes.add("scala.Tuple1$mcD$sp");
-        scalaTypes.add("scala.Tuple2$mcJJ$sp");
-        scalaTypes.add("scala.Tuple2$mcJI$sp");
-        scalaTypes.add("scala.Tuple2$mcJD$sp");
-        scalaTypes.add("scala.Tuple2$mcIJ$sp");
-        scalaTypes.add("scala.Tuple2$mcII$sp");
-        scalaTypes.add("scala.Tuple2$mcID$sp");
-        scalaTypes.add("scala.Tuple2$mcDJ$sp");
-        scalaTypes.add("scala.Tuple2$mcDI$sp");
-        scalaTypes.add("scala.Tuple2$mcDD$sp");
-        scalaTypes.add("scala.Enumeration$ValueSet");
-    }
-
-    private static boolean isAnonymousClass(Class clazz) {
-        final String name = clazz.getName();
-
-        // isAnonymousClass does not work for anonymous Scala classes; 
additionally check by class
-        // name
-        if (name.contains("$anon$") || name.contains("$anonfun") || 
name.contains("$macro$")) {
-            return true;
-        }
-
-        // calling isAnonymousClass or getSimpleName can throw InternalError 
for certain Scala
-        // types, see https://issues.scala-lang.org/browse/SI-2034
-        // until we move to JDK 9, this try-catch is necessary
-        try {
-            return clazz.isAnonymousClass();
-        } catch (InternalError e) {
-            return false;
+        /**
+         * Checks if the local version of the given class can safely 
deserialize the class of a
+         * different version from the object stream.
+         */
+        boolean shouldTolerateSerialVersionMismatch(
+                String className, long localVersionUID, long streamVersionUID) 
{
+            return supportedSerialVersionUidsPerClass
+                    .getOrDefault(className, Collections.emptyMap())
+                    .getOrDefault(localVersionUID, Collections.emptyList())
+                    .contains(streamVersionUID);
+        }
+
+        /**
+         * Checks if there are any rules for the given class. This lets us 
decide early if we need
+         * to look up the local class.
+         */
+        boolean haveRulesForClass(String className) {
+            return supportedSerialVersionUidsPerClass.containsKey(className);
         }
     }
 
-    private static boolean isOldAvroSerializer(String name, long 
serialVersionUID) {
-        // please see FLINK-11436 for details on why we need to ignore serial 
version UID here for
-        // the AvroSerializer
-        return (serialVersionUID == 1)
-                && 
"org.apache.flink.formats.avro.typeutils.AvroSerializer".equals(name);
+    private static final VersionMismatchHandler versionMismatchHandler =
+            new VersionMismatchHandler();
+
+    static {
+        // See FLINK-36318
+        versionMismatchHandler.addVersionsMatch(
+                "org.apache.flink.table.runtime.typeutils.MapDataSerializer",
+                4073842523628732956L,
+                Collections.singletonList(2533002123505507000L));
     }
 
     /**
-     * A mapping between the full path of a deprecated serializer and its 
equivalent. These mappings
-     * are hardcoded and fixed.
-     *
-     * <p>IMPORTANT: mappings can be removed after 1 release as there will be 
a "migration path". As
-     * an example, a serializer is removed in 1.5-SNAPSHOT, then the mapping 
should be added for
-     * 1.5, and it can be removed in 1.6, as the path would be Flink-{< 1.5} 
-> Flink-1.5 ->
-     * Flink-{>= 1.6}.
+     * An {@link ObjectInputStream} that ignores certain serialVersionUID 
mismatches. This is a
+     * workaround for uncontrolled serialVersionUIDs changes.
      */
-    private enum MigrationUtil {

Review Comment:
   Those were unused. It was missed during previous clean ups.



##########
flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java:
##########
@@ -146,144 +144,93 @@ protected Class<?> resolveProxyClass(String[] interfaces)
     }
 
     /**
-     * This is maintained as a temporary workaround for FLINK-6869.
-     *
-     * <p>Before 1.3, the Scala serializers did not specify the 
serialVersionUID. Although since 1.3
-     * they are properly specified, we still have to ignore them for now as 
their previous
-     * serialVersionUIDs will vary depending on the Scala version.
-     *
-     * <p>This can be removed once 1.2 is no longer supported.
+     * Workaround for bugs like e.g. FLINK-36318 where we serialize a class 
into a snapshot and then
+     * its serialVersionUID is changed in an uncontrolled way. This lets us 
deserialize the old
+     * snapshot assuming the binary representation of the faulty class has not 
changed.
      */
-    private static final Set<String> scalaSerializerClassnames = new 
HashSet<>();
+    private static final class VersionMismatchHandler {
 
-    static {
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
-        
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
-    }
+        private final Map<String, Map<Long, List<Long>>> 
supportedSerialVersionUidsPerClass =
+                new HashMap<>();
 
-    /**
-     * The serialVersionUID might change between Scala versions and since 
those classes are part of
-     * the tuple serializer config snapshots we need to ignore them.
-     *
-     * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-8451";>FLINK-8451</a>
-     */
-    private static final Set<String> scalaTypes = new HashSet<>();
+        void addVersionsMatch(
+                String className, long localVersionUID, List<Long> 
streamVersionUIDs) {
+            supportedSerialVersionUidsPerClass
+                    .computeIfAbsent(className, k -> new HashMap<>())
+                    .put(localVersionUID, streamVersionUIDs);
+        }
 
-    static {

Review Comment:
   Those were unused. It was missed during previous clean ups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to