Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5588#discussion_r171001301 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java --- @@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoad try { int type = buffer.getInt(); - switch (type) { - case END_OF_PARTITION_EVENT: - return eventClass.equals(EndOfPartitionEvent.class); - case CHECKPOINT_BARRIER_EVENT: - return eventClass.equals(CheckpointBarrier.class); - case END_OF_SUPERSTEP_EVENT: - return eventClass.equals(EndOfSuperstepEvent.class); - case CANCEL_CHECKPOINT_MARKER_EVENT: - return eventClass.equals(CancelCheckpointMarker.class); - case OTHER_EVENT: - try { - final DataInputDeserializer deserializer = new DataInputDeserializer(buffer); - final String className = deserializer.readUTF(); - - final Class<? extends AbstractEvent> clazz; - try { - clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class); - } - catch (ClassNotFoundException e) { - throw new IOException("Could not load event class '" + className + "'.", e); - } - catch (ClassCastException e) { - throw new IOException("The class '" + className + "' is not a valid subclass of '" - + AbstractEvent.class.getName() + "'.", e); - } - return eventClass.equals(clazz); - } - catch (Exception e) { - throw new IOException("Error while deserializing or instantiating event.", e); - } - default: - throw new IOException("Corrupt byte stream for event"); + if (eventClass.equals(EndOfPartitionEvent.class)) { + return type == END_OF_PARTITION_EVENT; + } else if (eventClass.equals(CheckpointBarrier.class)) { + return type == CHECKPOINT_BARRIER_EVENT; + } else if (eventClass.equals(EndOfSuperstepEvent.class)) { + return type == END_OF_SUPERSTEP_EVENT; + } else if (eventClass.equals(CancelCheckpointMarker.class)) { + return type == CANCEL_CHECKPOINT_MARKER_EVENT; + } else { + throw new IOException("Corrupt byte stream for event or unsupported eventClass = " + eventClass); --- End diff -- Actually, this should be an `UnsupportedOperationException` since this is only based on the class being given and not the input stream.
---