Hi everyone,

Mihail and I have now solved the issue.

The exception was caused because the array size in question was read from a
static field of the enclosing class, inside an anonymous mapper. Making the
mapper a standalone class and passing the array size to the constructor
solved the issue.

What I don't understand though, is why this worked fine when the job was
executed from inside the IDE. Is serialization handled differently
(skipped) in this case?

Cheers,
Vasia.

On 26 June 2015 at 11:30, Mihail Vieru <vi...@informatik.hu-berlin.de>
wrote:

>  Hi Vasia,
>
> *InitVerticesMapper* is called in the run method of APSP:
>
> *    @Override*
> *    public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K,
> Tuple2<Integer[],String>, NullValue> input) {*
>
> *        VertexCentricConfiguration parameters = new
> VertexCentricConfiguration();*
> *        parameters.setSolutionSetUnmanagedMemory(false);*
>
> *        return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))*
> *                .runVertexCentricIteration(new VertexDistanceUpdater<K,
> Tuple2<Integer[],String>, Integer>(srcVertexId),*
> *                        new MinDistanceMessenger<K,
> Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),*
> *                        maxIterations, parameters);*
> *        }*
>
> I'll send you the full code via a private e-mail.
>
> Cheers,
> Mihail
>
>
> On 26.06.2015 11:10, Vasiliki Kalavri wrote:
>
>  Hi Mihail,
>
>  could you share your code or at least the implementations of
> getVerticesDataSet() and InitVerticesMapper so I can take a look?
> Where is InitVerticesMapper called above?
>
>  Cheers,
> Vasia.
>
>
> On 26 June 2015 at 10:51, Mihail Vieru <vi...@informatik.hu-berlin.de>
> wrote:
>
>>  Hi Robert,
>>
>> I'm using the same input data, as well as the same parameters I use in
>> the IDE's run configuration.
>> I don't run the job on the cluster (yet), but locally, by starting Flink
>> with the start-local.sh script.
>>
>>
>> I will try to explain my code a bit. The *Integer[] *array is
>> initialized in the *getVerticesDataSet()* method.
>>
>> *        DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices =
>> getVerticesDataSet(env);*
>> *        ...*
>> *        Graph<Integer, Tuple2<Integer[],String>, NullValue> graph =
>> Graph.fromDataSet(vertices, edges, env);*
>> *        ...*
>> *        Graph<Integer, Tuple2<Integer[],String>, NullValue>
>> intermediateGraph = *
>> *                graph.run(new APSP<Integer>(srcVertexId,
>> maxIterations));*
>>
>>
>> In APSP I'm addressing it in the *InitVerticesMapper*, but is now
>> suddenly empty.
>>
>> Best,
>> Mihail
>>
>>
>> On 26.06.2015 10:00, Robert Metzger wrote:
>>
>> Hi Mihail,
>>
>>  the NPE has been thrown from
>> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess that
>> is code written by you or a library you are using.
>> Maybe the data you are using on the cluster is different from your local
>> test data?
>>
>>  Best,
>> Robert
>>
>>
>> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru <
>> vi...@informatik.hu-berlin.de> wrote:
>>
>>>  Hi,
>>>
>>> I get an ArrayIndexOutOfBoundsException when I run my job from a JAR in
>>> the CLI.
>>> This doesn't occur in the IDE.
>>>
>>> I've build the JAR using the "maven-shade-plugin" and the pom.xml
>>> configuration Robert has provided here:
>>>
>>> https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs
>>> I specify the entry point using the "-c" option.
>>>
>>> The array the Exception refers to is actually initialized when a
>>> vertices dataset is read from the file system.
>>>
>>> Any ideas on what could cause this issue?
>>>
>>> Best,
>>> Mihail
>>>
>>> P.S.: the stack trace:
>>>
>>> *org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.*
>>> *    at org.apache.flink.client.program.Client.run(Client.java:413)*
>>> *    at org.apache.flink.client.program.Client.run(Client.java:356)*
>>> *    at org.apache.flink.client.program.Client.run(Client.java:349)*
>>> *    at
>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)*
>>> *    at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)*
>>> *    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>>> *    at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)*
>>> *    at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>> *    at java.lang.reflect.Method.invoke(Method.java:606)*
>>> *    at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)*
>>> *    at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)*
>>> *    at org.apache.flink.client.program.Client.run(Client.java:315)*
>>> *    at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)*
>>> *    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)*
>>> *    at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)*
>>> *    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)*
>>> *Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>> execution failed.*
>>> *    at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)*
>>> *    at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)*
>>> *    at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)*
>>> *    at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)*
>>> *    at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)*
>>> *    at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)*
>>> *    at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)*
>>> *    at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)*
>>> *    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)*
>>> *    at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)*
>>> *    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)*
>>> *    at akka.actor.ActorCell.invoke(ActorCell.scala:487)*
>>> *    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)*
>>> *    at akka.dispatch.Mailbox.run(Mailbox.scala:221)*
>>> *    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)*
>>> *    at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)*
>>> *    at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)*
>>> *    at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)*
>>> *    at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)*
>>> *Caused by: java.lang.ArrayIndexOutOfBoundsException: 0*
>>> *    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*
>>> *    at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)*
>>> *    at org.apache.flink.graph.Graph$2.map(Graph.java:389)*
>>> *    at org.apache.flink.graph.Graph$2.map(Graph.java:387)*
>>> *    at
>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)*
>>> *    at
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)*
>>> *    at
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)*
>>> *    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)*
>>> *    at java.lang.Thread.run(Thread.java:745)*
>>>
>>
>>
>>
>
>

Reply via email to