@Stephan: I don't think there is a way to deal with this. In my understanding, the (main) purpose of the user@ list is not to report Flink bugs. It is a forum for users to help each other. Flink committers happen to know a lot about the system, so its easy for them to help users. Also, its a good way to understand how users are using the system. This makes designing APIs, writing error messages and documentation easier. I'm very happy to see that more and more non-committers are joining discussions here and helping other users.
On Mon, Jun 29, 2015 at 11:05 AM, Stephan Ewen <se...@apache.org> wrote: > Static fields are not parts of the serialized program (by Java's > definition). Whether the static field has the same value in the cluster > JVMs depends on how the static field is initialized, whether it is > initialized the same way in the shipped code, without the program's main > method. > > > BTW: We are seeing more and more cases now where committers are spending a > lot of time on debugging reported Flink bugs, that are actually user bugs. > At some point we need to think of a way to deal with this, as more and more > such things are reported... > > On Mon, Jun 29, 2015 at 11:02 AM, Vasiliki Kalavri < > vasilikikala...@gmail.com> wrote: > >> Thank you for the answer Robert! >> >> I realize it's a single JVM running, yet I would expect programs to >> behave in the same way, i.e. serialization to happen (even if not >> necessary), in order to catch this kind of bugs before cluster deployment. >> Is this simply not possible or is it a design choice we made for some >> reason? >> >> -V. >> >> On 29 June 2015 at 09:53, Robert Metzger <rmetz...@apache.org> wrote: >> >>> It is working in the IDE because there we execute everything in the same >>> JVM, so the mapper can access the correct value of the static variable. >>> When submitting a job with the CLI frontend, there are at least two JVMs >>> involved, and code running in the JM/TM can not access the value from the >>> static variable in the Cli frontend. >>> >>> On Sun, Jun 28, 2015 at 9:43 PM, Vasiliki Kalavri < >>> vasilikikala...@gmail.com> wrote: >>> >>>> Hi everyone, >>>> >>>> Mihail and I have now solved the issue. >>>> >>>> The exception was caused because the array size in question was read >>>> from a static field of the enclosing class, inside an anonymous mapper. >>>> Making the mapper a standalone class and passing the array size to the >>>> constructor solved the issue. >>>> >>>> What I don't understand though, is why this worked fine when the job >>>> was executed from inside the IDE. Is serialization handled differently >>>> (skipped) in this case? >>>> >>>> Cheers, >>>> Vasia. >>>> >>>> On 26 June 2015 at 11:30, Mihail Vieru <vi...@informatik.hu-berlin.de> >>>> wrote: >>>> >>>>> Hi Vasia, >>>>> >>>>> *InitVerticesMapper* is called in the run method of APSP: >>>>> >>>>> * @Override* >>>>> * public Graph<K, Tuple2<Integer[],String>, NullValue> run(Graph<K, >>>>> Tuple2<Integer[],String>, NullValue> input) {* >>>>> >>>>> * VertexCentricConfiguration parameters = new >>>>> VertexCentricConfiguration();* >>>>> * parameters.setSolutionSetUnmanagedMemory(false);* >>>>> >>>>> * return input.mapVertices(new >>>>> InitVerticesMapper<K>(srcVertexId))* >>>>> * .runVertexCentricIteration(new >>>>> VertexDistanceUpdater<K, Tuple2<Integer[],String>, Integer>(srcVertexId),* >>>>> * new MinDistanceMessenger<K, >>>>> Tuple2<Integer[],String>, Integer, NullValue>(srcVertexId),* >>>>> * maxIterations, parameters);* >>>>> * }* >>>>> >>>>> I'll send you the full code via a private e-mail. >>>>> >>>>> Cheers, >>>>> Mihail >>>>> >>>>> >>>>> On 26.06.2015 11:10, Vasiliki Kalavri wrote: >>>>> >>>>> Hi Mihail, >>>>> >>>>> could you share your code or at least the implementations of >>>>> getVerticesDataSet() and InitVerticesMapper so I can take a look? >>>>> Where is InitVerticesMapper called above? >>>>> >>>>> Cheers, >>>>> Vasia. >>>>> >>>>> >>>>> On 26 June 2015 at 10:51, Mihail Vieru <vi...@informatik.hu-berlin.de> >>>>> wrote: >>>>> >>>>>> Hi Robert, >>>>>> >>>>>> I'm using the same input data, as well as the same parameters I use >>>>>> in the IDE's run configuration. >>>>>> I don't run the job on the cluster (yet), but locally, by starting >>>>>> Flink with the start-local.sh script. >>>>>> >>>>>> >>>>>> I will try to explain my code a bit. The *Integer[] *array is >>>>>> initialized in the *getVerticesDataSet()* method. >>>>>> >>>>>> * DataSet<Vertex<Integer, Tuple2<Integer[],String> >> vertices >>>>>> = getVerticesDataSet(env);* >>>>>> * ...* >>>>>> * Graph<Integer, Tuple2<Integer[],String>, NullValue> graph = >>>>>> Graph.fromDataSet(vertices, edges, env);* >>>>>> * ...* >>>>>> * Graph<Integer, Tuple2<Integer[],String>, NullValue> >>>>>> intermediateGraph = * >>>>>> * graph.run(new APSP<Integer>(srcVertexId, >>>>>> maxIterations));* >>>>>> >>>>>> >>>>>> In APSP I'm addressing it in the *InitVerticesMapper*, but is now >>>>>> suddenly empty. >>>>>> >>>>>> Best, >>>>>> Mihail >>>>>> >>>>>> >>>>>> On 26.06.2015 10:00, Robert Metzger wrote: >>>>>> >>>>>> Hi Mihail, >>>>>> >>>>>> the NPE has been thrown from >>>>>> *graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)*. I guess >>>>>> that is code written by you or a library you are using. >>>>>> Maybe the data you are using on the cluster is different from your >>>>>> local test data? >>>>>> >>>>>> Best, >>>>>> Robert >>>>>> >>>>>> >>>>>> On Thu, Jun 25, 2015 at 7:41 PM, Mihail Vieru < >>>>>> vi...@informatik.hu-berlin.de> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I get an ArrayIndexOutOfBoundsException when I run my job from a JAR >>>>>>> in the CLI. >>>>>>> This doesn't occur in the IDE. >>>>>>> >>>>>>> I've build the JAR using the "maven-shade-plugin" and the pom.xml >>>>>>> configuration Robert has provided here: >>>>>>> >>>>>>> https://stackoverflow.com/questions/30102523/linkage-failure-when-running-apache-flink-jobs >>>>>>> I specify the entry point using the "-c" option. >>>>>>> >>>>>>> The array the Exception refers to is actually initialized when a >>>>>>> vertices dataset is read from the file system. >>>>>>> >>>>>>> Any ideas on what could cause this issue? >>>>>>> >>>>>>> Best, >>>>>>> Mihail >>>>>>> >>>>>>> P.S.: the stack trace: >>>>>>> >>>>>>> *org.apache.flink.client.program.ProgramInvocationException: The >>>>>>> program execution failed: Job execution failed.* >>>>>>> * at org.apache.flink.client.program.Client.run(Client.java:413)* >>>>>>> * at org.apache.flink.client.program.Client.run(Client.java:356)* >>>>>>> * at org.apache.flink.client.program.Client.run(Client.java:349)* >>>>>>> * at >>>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)* >>>>>>> * at graphdistance.KAPSPNaiveJob.main(KAPSPNaiveJob.java:56)* >>>>>>> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)* >>>>>>> * at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)* >>>>>>> * at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >>>>>>> * at java.lang.reflect.Method.invoke(Method.java:606)* >>>>>>> * at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)* >>>>>>> * at >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)* >>>>>>> * at org.apache.flink.client.program.Client.run(Client.java:315)* >>>>>>> * at >>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)* >>>>>>> * at >>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)* >>>>>>> * at >>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880)* >>>>>>> * at >>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)* >>>>>>> *Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>>> Job execution failed.* >>>>>>> * at >>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)* >>>>>>> * at >>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)* >>>>>>> * at >>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)* >>>>>>> * at >>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)* >>>>>>> * at >>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)* >>>>>>> * at >>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)* >>>>>>> * at >>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)* >>>>>>> * at >>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)* >>>>>>> * at akka.actor.Actor$class.aroundReceive(Actor.scala:465)* >>>>>>> * at >>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)* >>>>>>> * at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)* >>>>>>> * at akka.actor.ActorCell.invoke(ActorCell.scala:487)* >>>>>>> * at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)* >>>>>>> * at akka.dispatch.Mailbox.run(Mailbox.scala:221)* >>>>>>> * at akka.dispatch.Mailbox.exec(Mailbox.scala:231)* >>>>>>> * at >>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)* >>>>>>> * at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)* >>>>>>> * at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)* >>>>>>> * at >>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)* >>>>>>> *Caused by: java.lang.ArrayIndexOutOfBoundsException: 0* >>>>>>> * at graphdistance.APSP$InitVerticesMapper.map(APSP.java:74)* >>>>>>> * at graphdistance.APSP$InitVerticesMapper.map(APSP.java:48)* >>>>>>> * at org.apache.flink.graph.Graph$2.map(Graph.java:389)* >>>>>>> * at org.apache.flink.graph.Graph$2.map(Graph.java:387)* >>>>>>> * at >>>>>>> org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97)* >>>>>>> * at >>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)* >>>>>>> * at >>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)* >>>>>>> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)* >>>>>>> * at java.lang.Thread.run(Thread.java:745)* >>>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >> >