Ok but that would not prevent the above error, right? Serializing is not the issue here.
Nevertheless, it would catch all errors during initial serialization. Deserializing has its own hazards due to possible Classloader issues. On Wed, Sep 2, 2015 at 4:05 PM, Stephan Ewen <se...@apache.org> wrote: > Yes, even serialize in the constructor. Then the failure (if serialization > does not work) comes immediately. > > On Wed, Sep 2, 2015 at 4:02 PM, Maximilian Michels <m...@apache.org> wrote: >> >> Nice suggestion. So you want to serialize and deserialize the InputFormats >> on the Client to check whether they can be transferred correctly? Merely >> serializing is not enough because the above Exception occurs during >> deserialization. >> >> On Wed, Sep 2, 2015 at 2:29 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>> We should try to improve the exception here. More people will run into >>> this issue and the exception should help them understand it well. >>> >>> How about we do eager serialization into a set of byte arrays? Then the >>> serializability issue comes immediately when the program is constructed, >>> rather than later, when it is shipped. >>> >>> On Wed, Sep 2, 2015 at 12:56 PM, Maximilian Michels <m...@apache.org> >>> wrote: >>>> >>>> Here's the JIRA issue: https://issues.apache.org/jira/browse/FLINK-2608 >>>> >>>> On Wed, Sep 2, 2015 at 12:49 PM, Maximilian Michels <m...@apache.org> >>>> wrote: >>>>> >>>>> Hi Andreas, >>>>> >>>>> Thank you for reporting the problem and including the code to reproduce >>>>> the problem. I think there is a problem with the class serialization or >>>>> deserialization. Arrays.asList uses a private ArrayList class >>>>> (java.util.Arrays$ArrayList) which is not the one you would normally use >>>>> (java.util.ArrayList). >>>>> >>>>> I'll create a JIRA issue to keep track of the problem and to >>>>> investigate further. >>>>> >>>>> Best regards, >>>>> Max >>>>> >>>>> Here's the stack trace: >>>>> >>>>> Exception in thread "main" >>>>> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize >>>>> task 'DataSource (at main(Test.java:32) >>>>> (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the >>>>> InputFormat ([mytests.Test$TestClass@4d6025c5]) failed: unread block data >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:523) >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) >>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>> at >>>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) >>>>> at >>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>>> at >>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>>> at >>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>>> at >>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) >>>>> at >>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>>>> at >>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>>> at >>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>> at >>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>> Caused by: java.lang.Exception: Deserializing the InputFormat >>>>> ([mytests.Test$TestClass@4d6025c5]) failed: unread block data >>>>> at >>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) >>>>> at >>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:520) >>>>> ... 25 more >>>>> Caused by: java.lang.IllegalStateException: unread block data >>>>> at >>>>> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424) >>>>> at >>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) >>>>> at >>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) >>>>> at >>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) >>>>> at >>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >>>>> at >>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >>>>> at >>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) >>>>> at >>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) >>>>> at >>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) >>>>> at >>>>> org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) >>>>> ... 26 more >>>>> >>>>> On Wed, Sep 2, 2015 at 11:17 AM, Andres R. Masegosa <and...@cs.aau.dk> >>>>> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I get a bug when trying to broadcast a list of integers created with >>>>>> the >>>>>> primitive "Arrays.asList(...)". >>>>>> >>>>>> For example, if you try to run this "wordcount" example, you can >>>>>> reproduce the bug. >>>>>> >>>>>> >>>>>> public class WordCountExample { >>>>>> public static void main(String[] args) throws Exception { >>>>>> final ExecutionEnvironment env = >>>>>> ExecutionEnvironment.getExecutionEnvironment(); >>>>>> >>>>>> DataSet<String> text = env.fromElements( >>>>>> "Who's there?", >>>>>> "I think I hear them. Stand, ho! Who's there?"); >>>>>> >>>>>> List<Integer> elements = Arrays.asList(0, 0, 0); >>>>>> >>>>>> DataSet<TestClass> set = env.fromElements(new >>>>>> TestClass(elements)); >>>>>> >>>>>> DataSet<Tuple2<String, Integer>> wordCounts = text >>>>>> .flatMap(new LineSplitter()) >>>>>> .withBroadcastSet(set, "set") >>>>>> .groupBy(0) >>>>>> .sum(1); >>>>>> >>>>>> wordCounts.print(); >>>>>> } >>>>>> >>>>>> public static class LineSplitter implements >>>>>> FlatMapFunction<String, >>>>>> Tuple2<String, Integer>> { >>>>>> @Override >>>>>> public void flatMap(String line, Collector<Tuple2<String, >>>>>> Integer>> out) { >>>>>> for (String word : line.split(" ")) { >>>>>> out.collect(new Tuple2<String, Integer>(word, 1)); >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> public static class TestClass implements Serializable { >>>>>> private static final long serialVersionUID = >>>>>> -2932037991574118651L; >>>>>> >>>>>> List<Integer> integerList; >>>>>> public TestClass(List<Integer> integerList){ >>>>>> this.integerList=integerList; >>>>>> } >>>>>> >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> However, if instead of using the primitive "Arrays.asList(...)", we >>>>>> use >>>>>> instead the ArrayList<> constructor, there is any problem!!!! >>>>>> >>>>>> >>>>>> Regards, >>>>>> Andres >>>>> >>>>> >>>> >>> >> >