[ 
https://issues.apache.org/jira/browse/FLINK-2608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689820#comment-15689820
 ] 

ASF GitHub Bot commented on FLINK-2608:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2623
  
    @mxm @chermenin Can we please do a followup to this, given that it is 
already merged?
    
    The tests in `GroupReduceITCase` need to be replaced by proper targeted 
unit tests.
    
    Sorry for being super strict, but our build times are already exploding, 
and it is exactly cases like this one (adding distributed runtime Integration 
Tests to test a serializer) that contribute a lot to that.



> Arrays.asList(..) does not work with CollectionInputFormat
> ----------------------------------------------------------
>
>                 Key: FLINK-2608
>                 URL: https://issues.apache.org/jira/browse/FLINK-2608
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 0.9, 0.10.0
>            Reporter: Maximilian Michels
>            Assignee: Alexander Chermenin
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> When using Arrays.asList(..) as input for a CollectionInputFormat, the 
> serialization/deserialization fails when deploying the task.
> See the following program:
> {code:java}
> public class WordCountExample {
>     public static void main(String[] args) throws Exception {
>         final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     DataSet<String> text = env.fromElements(
>                 "Who's there?",
>                 "I think I hear them. Stand, ho! Who's there?");
>         // DOES NOT WORK
>         List<Integer> elements = Arrays.asList(0, 0, 0);
>         // The following works:
>         //List<Integer> elements = new ArrayList<>(new int[] {0,0,0});
>         DataSet<TestClass> set = env.fromElements(new TestClass(elements));
>         DataSet<Tuple2<String, Integer>> wordCounts = text
>                 .flatMap(new LineSplitter())
>                 .withBroadcastSet(set, "set")
>                 .groupBy(0)
>                 .sum(1);
>         wordCounts.print();
>     }
>     public static class LineSplitter implements FlatMapFunction<String,
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String line, Collector<Tuple2<String,
> Integer>> out) {
>             for (String word : line.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
>     public static class TestClass implements Serializable {
>         private static final long serialVersionUID = -2932037991574118651L;
>         List<Integer> integerList;
>         public TestClass(List<Integer> integerList){
>             this.integerList=integerList;
>         }
>     }
> {code}
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at main(Test.java:32) 
> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the 
> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>     at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>     at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Deserializing the InputFormat 
> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data
>     at 
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520)
>     ... 25 more
> Caused by: java.lang.IllegalStateException: unread block data
>     at 
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
>     at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>     at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
>     at 
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
>     at 
> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
>     at 
> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57)
>     ... 26 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to