Hi Ventura! You are distributing your data via something like "env.fromElements(...)" or "env.fromCollection(...)", is that correct?
The master node (JobManager) currently takes each InputFormat and checks whether it needs some "master side initialization". For file input formats, this computes for example the different splits of the file(s) that the parallel tasks will read. For inputs like "env.fromElements(...)" or "env.fromCollection(...)", this is redundant, since there is no need to coordinate anything, it is just that this initialization check happens for all inputs. It is a good idea to skip that for collection inputs. If you want to avoid that this happens on the JobManager, the simplest way would be to make the data source independent of the Cuda types. - Define the source as tuple2 with the row and column dimensions. DataSet<Tuple2<Integer, Integer>> source = env.fromElements(new Tuple2<>(...), new Tuple2<>(...)); - Transform the tuples into your Cuda types. Also, since that source is not parallel (java/scala collections are always run with parallelism 1), make sure you tell the system to go parallel after that: DataSet<GpuDataRegion> data = source.map( (tuple) -> { /* your code for inirialization } ).parallelism(64); // the last statement makes sure the mapper runs with 64 parallel instances Out of curiosity: The deserialization bug occurs here on the JobManager (because the JobManager looks into the Inputs), but I assume it would also occur on the TaskManagers (workers) once the proper execution starts? How is Core.CudaExecutor usually initialized, so that it is not null when you need it? Greetings, Stephan On Wed, Apr 22, 2015 at 5:50 PM, Ventura Del Monte < venturadelmo...@gmail.com> wrote: > I am using Flink 0.9-SNAPSHOT, this is the complete stack trace: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Cannot initialize task 'DataSource (at > <init>(DownpourSDG.java:28) > (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the > InputFormat > ([org.dl4flink.dl.neuralnets.models.autoencoder.AutoEncoderParam@352c308]) > failed: unread block data > at org.apache.flink.client.program.Client.run(Client.java:378)2015-04-22 > 17:14:18 INFO DL4Flink:158 - Elapsed: 3 > > at org.apache.flink.client.program.Client.run(Client.java:314) > at org.apache.flink.client.program.Client.run(Client.java:307) > at > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > at > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > at > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:70) > at org.dl4flink.DL4Flink.RunFlinkJob(DL4Flink.java:295) > at org.dl4flink.DL4Flink.main(DL4Flink.java:56) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot > initialize task 'DataSource (at <init>(DownpourSDG.java:28) > (org.apache.flink.api.java.io.CollectionInputFormat))': Deserializing the > InputFormat > ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) > failed: unread block data > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:527) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:511) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:511) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:197) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:44) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Deserializing the InputFormat > ([org.dl4flink.dl.neuralnets.models.AutoEncoder.AutoEncoderParam@352c308]) > failed: unread block data > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:60) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$2.apply(JobManager.scala:524) > ... 25 more > Caused by: java.lang.IllegalStateException: unread block data > at > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2424) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:57) > ... 26 more > > > I checked the jobmanager log and I know that the objected needed for the > deserialization is null. > > > public void kryoDeserialize(Kryo kryo, Input in) > { > this.rows = in.readInt(); > this.cols = in.readInt(); > this.size = this.rows * this.cols; > double[] tmp = in.readDoubles(this.size); > Core.CudaExecutor.invoke((handle) -> cuMemAlloc(this.deviceData, > this.size * Sizeof.DOUBLE)); // here CudaExecutor is null on JobManager > } > > As you can see, deviceData is my transient field I need to store/read in a > specific way, since it is a pointer to gpu memory. > > The object I need to deserialize is part of a broadcast set. I think this > is the reason why the jobmanager needs to read it (i figured it out after I > sent my first mail). > I am thinking over whether I should edit my code in order to get rid of > this situation, since having the jobmanager allocating could be a drawback. > What do you think about that? > > Thank you for your time! > > > 2015-04-22 16:48 GMT+02:00 Till Rohrmann <trohrm...@apache.org>: > >> The corresponding code snippet could also help. >> >> Cheers, >> >> Till >> >> On Wed, Apr 22, 2015 at 4:45 PM, Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hi, >>> >>> which version of Flink are you using? >>> >>> Can you send us the complete stack trace of the error to help us >>> understand the exact location where the issue occurs? >>> >>> On Wed, Apr 22, 2015 at 4:33 PM, Ventura Del Monte < >>> venturadelmo...@gmail.com> wrote: >>> >>>> Hello, I am working on a flink-based deep learning library for my >>>> master's thesis. I am experiencing this issue at the moment: I have a java >>>> class with a transient field, so I had to write both a kryo custom >>>> serializer and a java one. The (de)serialization needs to access another >>>> object of my system, so if I run my software locally it works fine because >>>> the needed object is instantiated meanwhile it crashes when I run it in a >>>> remote environment because when the jobmanager receives the data, the >>>> object needed for the deserialization is not present in the system. Thus, >>>> my question is whether it is possible to let the jobmanager execute some >>>> user code or would it be better to edit the architecture of my system in >>>> order to avoid this kind of problem? >>>> >>>> Regards, >>>> Ventura >>>> >>> >>> >> >