It is working in the IDE because there we execute everything in the same JVM, so the mapper can access the correct value of the static variable. When submitting a job with the CLI frontend, there are at least two JVMs involved, and code running in the JM/TM can not access the value from the static variable in the Cli frontend.
On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri <vasilikikala...@gmail.com > wrote: > Hi everyone, > > Mihail and I have now solved the issue. > > The exception was caused because the array size in question was read from > a static field of the enclosing class, inside an anonymous mapper. Making > the mapper a standalone class and passing the array size to the constructor > solved the issue. > > What I don't understand though, is why this worked fine when the job was > executed from inside the IDE. Is serialization handled differently > (skipped) in this case? > > Cheers, > Vasia. > > On 26 June 2015 at 11:30, Mihail Vieru <vi...@informatik.hu-berlin.de> > wrote: > >> Hi Vasia, >> >> *InitVerticesMapper* is called in the run method of APSP: >> >> * @Override* >> * public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K, >> Tuple2<Integer[],String>, NullValue> input) {* >> >> * VertexCentricConfiguration parameters = new >> VertexCentricConfiguration();* >> * parameters.setSolutionSetUnmanagedMemory(false);* >> >> * return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))* >> * .runVertexCentricIteration(new VertexDistanceUpdater<K, >> Tuple2<Integer[],String>, Integer>(srcVertexId),* >> * new MinDistanceMessenger<K, >> Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),* >> * maxIterations, parameters);* >> * }* >> >> I'll send you the full code via a private e-mail. >> >> Cheers, >> Mihail >> >> >> On 26.06.2015 11:10, Vasiliki Kalavri wrote: >> >> Hi Mihail, >> >> could you share your code or at least the implementations of >> getVerticesDataSet() and InitVerticesMapper so I can take a look? >> Where is InitVerticesMapper called above? >> >> Cheers, >> Vasia. >> >> >> On 26 June 2015 at 10:51, Mihail Vieru <vi...@informatik.hu-berlin.de> >> wrote: >> >>> Hi Robert, >>> >>> I'm using the same input data, as well as the same parameters I use in >>> the IDE's run configuration. >>> I don't run the job on the cluster (yet), but locally, by starting Flink >>> with the start-local.sh script. >>> >>> >>> I will try to explain my code a bit. The *Integer[] *array is >>> initialized in the *getVerticesDataSet()* method. >>> >>> * DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices = >>> getVerticesDataSet(env);* >>> * ...* >>> * Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = >>> Graph.fromDataSet(vertices, edges, env);* >>> * ...* >>> * Graph<Integer, Tuple2<Integer[],String>, NullValue> >>> intermediateGraph = * >>> * graph.run(new APSP<Integer>(srcVertexId, >>> maxIterations));* >>> >>> >>> In APSP I'm addressing it in the *InitVerticesMapper*, but is now >>> suddenly empty. >>> >>> Best, >>> Mihail >>> >>> >>> On 26.06.2015 10:00, Robert Metzger wrote: >>> >>> Hi Mihail, >>> >>> the NPE has been thrown from >>> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess that >>> is code written by you or a library you are using. >>> Maybe the data you are using on the cluster is different from your local >>> test data? >>> >>> Best, >>> Robert >>> >>> >>> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru < >>> vi...@informatik.hu-berlin.de> wrote: >>> >>>> Hi, >>>> >>>> I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in >>>> the CLI. >>>> This doesn't occur in the IDE. >>>> >>>> I've build the JAR using the "maven-shade-plugin" and the pom.xml >>>> configuration Robert has provided here: >>>> >>>> https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs >>>> I specify the entry point using the "-c" option. >>>> >>>> The array the Exception refers to is actually initialized when a >>>> vertices dataset is read from the file system. >>>> >>>> Any ideas on what could cause this issue? >>>> >>>> Best, >>>> Mihail >>>> >>>> P.S.: the stack trace: >>>> >>>> *org.apache.flink.client.program.ProgramInvocationException: The >>>> program execution failed: Job execution failed.* >>>> * at org.apache.flink.client.program.Client.run(Client.java:413)* >>>> * at org.apache.flink.client.program.Client.run(Client.java:356)* >>>> * at org.apache.flink.client.program.Client.run(Client.java:349)* >>>> * at >>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)* >>>> * at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)* >>>> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* >>>> * at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* >>>> * at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >>>> * at java.lang.reflect.Method.invoke(Method.java:606)* >>>> * at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)* >>>> * at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)* >>>> * at org.apache.flink.client.program.Client.run(Client.java:315)* >>>> * at >>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)* >>>> * at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)* >>>> * at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)* >>>> * at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)* >>>> *Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>> execution failed.* >>>> * at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)* >>>> * at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)* >>>> * at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)* >>>> * at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)* >>>> * at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)* >>>> * at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)* >>>> * at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)* >>>> * at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)* >>>> * at akka.actor.Actor$class.aroundReceive(Actor.scala:465)* >>>> * at >>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)* >>>> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)* >>>> * at akka.actor.ActorCell.invoke(ActorCell.scala:487)* >>>> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)* >>>> * at akka.dispatch.Mailbox.run(Mailbox.scala:221)* >>>> * at akka.dispatch.Mailbox.exec(Mailbox.scala:231)* >>>> * at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)* >>>> * at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)* >>>> * at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)* >>>> * at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)* >>>> *Caused by: java.lang.ArrayIndexOutOfBoundsException: 0* >>>> * at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)* >>>> * at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)* >>>> * at org.apache.flink.graph.Graph$2.map(Graph.java:389)* >>>> * at org.apache.flink.graph.Graph$2.map(Graph.java:387)* >>>> * at >>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)* >>>> * at >>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)* >>>> * at >>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)* >>>> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)* >>>> * at java.lang.Thread.run(Thread.java:745)* >>>> >>> >>> >>> >> >> >