Dear all,

I am building a graph from two JSON files.

Spark version 1.6.1

Creating Edge and Vertex RDDs from JSON files.

The vertex JSON files looks like this:

        {"toid": "osgb4000000031043205", "index": 1, "point": [508180.748, 
195333.973]}
        {"toid": "osgb4000000031043206", "index": 2, "point": [508163.122, 
195316.627]}
        {"toid": "osgb4000000031043207", "index": 3, "point": [508172.075, 
195325.719]}
        {"toid": "osgb4000000031043208", "index": 4, "point": [508513, 196023]}

    val vertices_raw = sqlContext.read.json("vertices.json.gz")

    val vertices = vertices_raw.rdd.map(row=> 
((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[String]("index")))
    
    val verticesRDD: RDD[(VertexId, String)] = vertices

The edges JSON file looks like this:

        {"index": 1, "term": "Private Road - Restricted Access", "nature": 
"Single Carriageway", "negativeNode": "osgb4000000023183407", "toid": 
"osgb4000000023296573", "length": 112.8275895775762, "polyline": [492019.481, 
156567.076, 492028, 156567, 492041.667, 156570.536, 492063.65, 156578.067, 
492126.5, 156602], "positiveNode": "osgb4000000023183409"}
        {"index": 2, "term": "Private Road - Restricted Access", "nature": 
"Single Carriageway", "negativeNode": "osgb4000000023763485", "toid": 
"osgb4000000023296574", "length": 141.57731318733806, "polyline": [492144.493, 
156762.059, 492149.35, 156750, 492195.75, 156630], "positiveNode": 
"osgb4000000023183408"}

    val edges_raw = sqlContext.read.json("edges.json.gz")
    
    val edgesRDD = 
edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong,
 row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, 
row.getAs[Double]("length"))))

I have an EdgesRDD that I can inspect

    [IN] edgesRDD
    res10: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = 
MapPartitionsRDD[19] at map at <console>:38
    [IN] edgesRDD.foreach(println)

    Edge(5000005125036254,5000005125036231,42.26548472559799)
    Edge(5000005125651333,5000005125651330,29.557979625165135)
    Edge(5000005125651329,5000005125651330,81.9310872300414)

I have a verticesRDD

    [IN] verticesRDD
    res12: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[9] at 
map at <console>:38

    [IN] verticesRDD.foreach(println)
    (5000005125651331,343722)
    (5000005125651332,343723)
    (5000005125651333,343724)

I then combine these to create a graph.

    [IN] val graph: Graph[(String),Double] = Graph(verticesRDD, edgesRDD)
    graph: org.apache.spark.graphx.Graph[String,Double] = 
org.apache.spark.graphx.impl.GraphImpl@303bbd02

I can inspect the edgesRDD within the graph object:

    [IN] graph.edges.foreach(println)

    Edge(5000005125774813,4000000029917080,72.9742898009203)
    Edge(5000005125774814,5000005125774813,49.87951589790352)
    Edge(5000005125775080,4000000029936370,69.62871049042008)

However, when I inspect the verticesRDD:

    [IN] graph.vertices.foreach(println)
Is there an issue with my graph construction? 

    ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 13)
    java.lang.ArrayStoreException: java.lang.Long
                at 
scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
                at 
org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
                at scala.collection.Iterator$class.foreach(Iterator.scala:727)
                at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
                at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
                at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
                at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
                at 
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
                at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
                at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:89)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
    16/08/17 12:27:16 WARN TaskSetManager: Lost task 0.0 in stage 15.0 (TID 13, 
localhost): java.lang.ArrayStoreException: java.lang.Long
                at 
scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
                at 
org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
                at scala.collection.Iterator$class.foreach(Iterator.scala:727)
                at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
                at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
                at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
                at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
                at 
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
                at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
                at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:89)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
    
    16/08/17 12:27:16 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 
times; aborting job
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 
(TID 13, localhost): java.lang.ArrayStoreException: java.lang.Long
                at 
scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
                at 
org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
                at scala.collection.Iterator$class.foreach(Iterator.scala:727)
                at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
                at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
                at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
                at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
                at 
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
                at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
                at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:89)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
    
    Driver stacktrace:
                at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
                at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
                at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
                at scala.Option.foreach(Option.scala:236)
                at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
                at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
                at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
                at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
                at 
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
                at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
                at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
                at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
                at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
                at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
                at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
                at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
                at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
                at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
                at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
                at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56)
                at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
                at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)
                at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62)
                at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
                at $iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
                at $iwC$$iwC$$iwC.<init>(<console>:68)
                at $iwC$$iwC.<init>(<console>:70)
                at $iwC.<init>(<console>:72)
                at <init>(<console>:74)
                at .<init>(<console>:78)
                at .<clinit>(<console>)
                at .<init>(<console>:7)
                at .<clinit>(<console>)
                at $print(<console>)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
                at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
                at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
                at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
                at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
                at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
                at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
                at 
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
                at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
                at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
                at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
                at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
                at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
                at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
                at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
                at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
                at 
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
                at org.apache.spark.repl.Main$.main(Main.scala:31)
                at org.apache.spark.repl.Main.main(Main.scala)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
                at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
                at 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
                at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
                at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.lang.ArrayStoreException: java.lang.Long
                at 
scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
                at 
org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap.setMerge(GraphXPrimitiveKeyOpenHashMap.scala:87)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:61)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$$anonfun$apply$5.apply(ShippableVertexPartition.scala:60)
                at scala.collection.Iterator$class.foreach(Iterator.scala:727)
                at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
                at 
org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
                at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:328)
                at 
org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:325)
                at 
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
                at 
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
                at org.apache.spark.graphx.VertexRDD.compute(VertexRDD.scala:71)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
                at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
                at org.apache.spark.scheduler.Task.run(Task.scala:89)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)

I have checked the VertexId requirements:

    VertexId = type Long
    A 64-bit vertex identifier that uniquely identifies a vertex within a graph.

The unique fields I have provided, for example, `5000005125036318` are 
satisfactory. 

Can anyone shed some light on this? 

Many thanks for reading so far ;)

G

Reply via email to