Hi Flavio! This issue is known and has been fixed already. It occurs when you use custom types in collect, because it uses the wrong classloader/serializer to transfer them.
The current master should not have this issue any more. Greetings, Stephan On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <f.baro...@list-group.com> wrote: > Hello, > > I'm testing the new DataSet.collect() method on version 0.9-milestone-1, > but > I obtain the following error on cluster execution (no problem with local > execution), which also causes the job manager to crash: > > 14:05:41,145 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at > main(Test01.java:54)) -> F > latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3 > 14:05:41,211 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Deploying DataSink > (org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16) (1/1) > (attemp > t #0) to india3 > 14:05:41,269 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May > 04 14:05:39 CEST 201 > 5) changed to FINISHED . > 14:05:41,284 ERROR akka.actor.OneForOneStrategy > - java.io.StreamCorruptedException: invalid type code: 00 > org.apache.commons.lang3.SerializationException: > java.io.StreamCorruptedException: invalid type code: 00 > at > > org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j > ava:232) > at > > org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j > ava:268) > at > > org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA > ccumulator.java:51) > at > > org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA > ccumulator.java:35) > at > > org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc > cumulatorResults(AccumulatorManager.java:77) > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag > es$1.applyOrElse(JobManager.scala:300) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF > unction.scala:33) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction. > scala:33) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction. > scala:25) > at > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca > la:37) > at > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca > la:30) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag > es.scala:30) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal > a:91) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool > .java:1253) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1 > 346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java > :107) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379) > at > java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > > org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j > ava:224) > ... 24 more > 14:05:41,290 INFO org.apache.flink.runtime.jobmanager.JobManager > - Stopping JobManager akka://flink/user/jobmanager#-828467473. > 14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Actor akka://flink/user/jobmanager#-828467473 terminated, stopping > process... > > Is this a known issue? Am I doing something wrong? > > Thanks > Flavio > > >