Dear all, I am building a graph from two JSON files.
Spark version 1.6.1 Creating Edge and Vertex RDDs from JSON files. The vertex JSON files looks like this: {"toid": "osgb4000000031043205", "index": 1, "point": [508180.748, 195333.973]} {"toid": "osgb4000000031043206", "index": 2, "point": [508163.122, 195316.627]} {"toid": "osgb4000000031043207", "index": 3, "point": [508172.075, 195325.719]} {"toid": "osgb4000000031043208", "index": 4, "point": [508513, 196023]} val vertices_raw = sqlContext.read.json("vertices.json.gz") val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index"))) val verticesRDD: RDD[(VertexId, String)] = vertices The edges JSON file looks like this: {"index": 1, "term": "Private Road - Restricted Access", "nature": "Single Carriageway", "negativeNode": "osgb4000000023183407", "toid": "osgb4000000023296573", "length": 112.8275895775762, "polyline": [492019.481, 156567.076, 492028, 156567, 492041.667, 156570.536, 492063.65, 156578.067, 492126.5, 156602], "positiveNode": "osgb4000000023183409"} {"index": 2, "term": "Private Road - Restricted Access", "nature": "Single Carriageway", "negativeNode": "osgb4000000023763485", "toid": "osgb4000000023296574", "length": 141.57731318733806, "polyline": [492144.493, 156762.059, 492149.35, 156750, 492195.75, 156630], "positiveNode": "osgb4000000023183408"} val edges_raw = sqlContext.read.json("edges.json.gz") val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length")))) I have an EdgesRDD that I can inspect [IN] edgesRDD res10: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[19] at map at <console>:38 [IN] edgesRDD.foreach(println) Edge(5000005125036254,5000005125036231,42.26548472559799) Edge(5000005125651333,5000005125651330,29.557979625165135) Edge(5000005125651329,5000005125651330,81.9310872300414) I have a verticesRDD [IN] verticesRDD res12: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[9] at map at <console>:38 [IN] verticesRDD.foreach(println) (5000005125651331,343722) (5000005125651332,343723) (5000005125651333,343724) I then combine these to create a graph. [IN] val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD) graph: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@303bbd02 I can inspect the edgesRDD within the graph object: [IN] graph.edges.foreach(println) Edge(5000005125774813,4000000029917080,72.9742898009203) Edge(5000005125774814,5000005125774813,49.87951589790352) Edge(5000005125775080,4000000029936370,69.62871049042008) However, when I inspect the verticesRDD: [IN] graph.vertices.foreach(println) Is there an issue with my graph construction? ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 13) java.lang.ArrayStoreException: java.lang.Long at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87) at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61) at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 12:27:16 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87) at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61) at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/08/17 12:27:16 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87) at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61) at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:66) at $iwC$$iwC$$iwC.<init>(<console>:68) at $iwC$$iwC.<init>(<console>:70) at $iwC.<init>(<console>:72) at <init>(<console>:74) at .<init>(<console>:78) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ArrayStoreException: java.lang.Long at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87) at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61) at org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328) at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I have checked the VertexId requirements: VertexId = type Long A 64-bit vertex identifier that uniquely identifies a vertex within a graph. The unique fields I have provided, for example, `5000005125036318` are satisfactory. Can anyone shed some light on this? Many thanks for reading so far ;) G