Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171001301
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
    @@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, 
Class<?> eventClass, ClassLoad
                try {
                        int type = buffer.getInt();
     
    -                   switch (type) {
    -                           case END_OF_PARTITION_EVENT:
    -                                   return 
eventClass.equals(EndOfPartitionEvent.class);
    -                           case CHECKPOINT_BARRIER_EVENT:
    -                                   return 
eventClass.equals(CheckpointBarrier.class);
    -                           case END_OF_SUPERSTEP_EVENT:
    -                                   return 
eventClass.equals(EndOfSuperstepEvent.class);
    -                           case CANCEL_CHECKPOINT_MARKER_EVENT:
    -                                   return 
eventClass.equals(CancelCheckpointMarker.class);
    -                           case OTHER_EVENT:
    -                                   try {
    -                                           final DataInputDeserializer 
deserializer = new DataInputDeserializer(buffer);
    -                                           final String className = 
deserializer.readUTF();
    -
    -                                           final Class<? extends 
AbstractEvent> clazz;
    -                                           try {
    -                                                   clazz = 
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
    -                                           }
    -                                           catch (ClassNotFoundException 
e) {
    -                                                   throw new 
IOException("Could not load event class '" + className + "'.", e);
    -                                           }
    -                                           catch (ClassCastException e) {
    -                                                   throw new 
IOException("The class '" + className + "' is not a valid subclass of '"
    -                                                           + 
AbstractEvent.class.getName() + "'.", e);
    -                                           }
    -                                           return eventClass.equals(clazz);
    -                                   }
    -                                   catch (Exception e) {
    -                                           throw new IOException("Error 
while deserializing or instantiating event.", e);
    -                                   }
    -                           default:
    -                                   throw new IOException("Corrupt byte 
stream for event");
    +                   if (eventClass.equals(EndOfPartitionEvent.class)) {
    +                           return type == END_OF_PARTITION_EVENT;
    +                   } else if (eventClass.equals(CheckpointBarrier.class)) {
    +                           return type == CHECKPOINT_BARRIER_EVENT;
    +                   } else if 
(eventClass.equals(EndOfSuperstepEvent.class)) {
    +                           return type == END_OF_SUPERSTEP_EVENT;
    +                   } else if 
(eventClass.equals(CancelCheckpointMarker.class)) {
    +                           return type == CANCEL_CHECKPOINT_MARKER_EVENT;
    +                   } else {
    +                           throw new IOException("Corrupt byte stream for 
event or unsupported eventClass = " + eventClass);
    --- End diff --
    
    Actually, this should be an `UnsupportedOperationException` since this is 
only based on the class being given and not the input stream.


---

Reply via email to