[ 
https://issues.apache.org/jira/browse/FLINK-8750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378989#comment-16378989
 ] 

ASF GitHub Bot commented on FLINK-8750:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171001301
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 ---
    @@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, 
Class<?> eventClass, ClassLoad
                try {
                        int type = buffer.getInt();
     
    -                   switch (type) {
    -                           case END_OF_PARTITION_EVENT:
    -                                   return 
eventClass.equals(EndOfPartitionEvent.class);
    -                           case CHECKPOINT_BARRIER_EVENT:
    -                                   return 
eventClass.equals(CheckpointBarrier.class);
    -                           case END_OF_SUPERSTEP_EVENT:
    -                                   return 
eventClass.equals(EndOfSuperstepEvent.class);
    -                           case CANCEL_CHECKPOINT_MARKER_EVENT:
    -                                   return 
eventClass.equals(CancelCheckpointMarker.class);
    -                           case OTHER_EVENT:
    -                                   try {
    -                                           final DataInputDeserializer 
deserializer = new DataInputDeserializer(buffer);
    -                                           final String className = 
deserializer.readUTF();
    -
    -                                           final Class<? extends 
AbstractEvent> clazz;
    -                                           try {
    -                                                   clazz = 
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
    -                                           }
    -                                           catch (ClassNotFoundException 
e) {
    -                                                   throw new 
IOException("Could not load event class '" + className + "'.", e);
    -                                           }
    -                                           catch (ClassCastException e) {
    -                                                   throw new 
IOException("The class '" + className + "' is not a valid subclass of '"
    -                                                           + 
AbstractEvent.class.getName() + "'.", e);
    -                                           }
    -                                           return eventClass.equals(clazz);
    -                                   }
    -                                   catch (Exception e) {
    -                                           throw new IOException("Error 
while deserializing or instantiating event.", e);
    -                                   }
    -                           default:
    -                                   throw new IOException("Corrupt byte 
stream for event");
    +                   if (eventClass.equals(EndOfPartitionEvent.class)) {
    +                           return type == END_OF_PARTITION_EVENT;
    +                   } else if (eventClass.equals(CheckpointBarrier.class)) {
    +                           return type == CHECKPOINT_BARRIER_EVENT;
    +                   } else if 
(eventClass.equals(EndOfSuperstepEvent.class)) {
    +                           return type == END_OF_SUPERSTEP_EVENT;
    +                   } else if 
(eventClass.equals(CancelCheckpointMarker.class)) {
    +                           return type == CANCEL_CHECKPOINT_MARKER_EVENT;
    +                   } else {
    +                           throw new IOException("Corrupt byte stream for 
event or unsupported eventClass = " + eventClass);
    --- End diff --
    
    Actually, this should be an `UnsupportedOperationException` since this is 
only based on the class being given and not the input stream.


> InputGate may contain data after an EndOfPartitionEvent
> -------------------------------------------------------
>
>                 Key: FLINK-8750
>                 URL: https://issues.apache.org/jira/browse/FLINK-8750
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: Nico Kruber
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The travis run at https://travis-ci.org/apache/flink/jobs/344425772 indicates 
> that there was still some data after an {{EndOfPartitionEvent}} or that 
> {{BufferOrEvent#moreAvailable}} contained the wrong value:
> {code}
> testOutputWithoutPk(org.apache.flink.table.runtime.stream.table.JoinITCase)  
> Time elapsed: 4.611 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: null
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
>       at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:292)
>       at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to