Hi Gerard, How are you starting spark? Are you allocating enough RAM for processing? I think the default is 512mb. Try to doing the following and see if it helps (based on the size of your dataset, you might not need all 8gb).
$SPARK_HOME/bin/spark-shell \ --master local[4] \ --executor-memory 8G \ --driver-memory 8G Thank You, Irving Duran On Tue, Jan 10, 2017 at 12:20 PM, Gerard Casey <gerardhughca...@gmail.com> wrote: > Hello everyone, > > I am creating a graph from a `gz` compressed `json` file of `edge` and > `vertices` type. > > I have put the files in a dropbox folder [here][1] > > I load and map these `json` records to create the `vertices` and `edge` > types required by `graphx` like this: > > val vertices_raw = sqlContext.read.json("path/vertices.json.gz") > val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid"). > stripPrefix("osgb").toLong),row.getAs[Long]("index"))) > val verticesRDD: RDD[(VertexId, Long)] = vertices > val edges_raw = sqlContext.read.json("path/edges.json.gz") > val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String](" > positiveNode").stripPrefix("osgb").toLong, row.getAs[String](" > negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length")))) > val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, > edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut) > > I then use this `dijkstra` implementation I found to compute a shortest > path between two vertices: > > def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = { > var g2 = g.mapVertices( > (vid, vd) => (false, if (vid == origin) 0 else > Double.MaxValue, List[VertexId]()) > ) > for (i <- 1L to g.vertices.count - 1) { > val currentVertexId: VertexId = > g2.vertices.filter(!_._2._1) > .fold((0L, (false, Double.MaxValue, List[VertexId]())))( > (a, b) => if (a._2._2 < b._2._2) a else b) > ._1 > > val newDistances: VertexRDD[(Double, List[VertexId])] = > g2.aggregateMessages[(Double, List[VertexId])]( > ctx => if (ctx.srcId == currentVertexId) { > ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3 > :+ ctx.srcId)) > }, > (a, b) => if (a._1 < b._1) a else b > ) > g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => { > val newSumVal = newSum.getOrElse((Double.MaxValue, > List[VertexId]())) > ( > vd._1 || vid == currentVertexId, > math.min(vd._2, newSumVal._1), > if (vd._2 < newSumVal._1) vd._3 else newSumVal._2 > ) > }) > } > > g.outerJoinVertices(g2.vertices)((vid, vd, dist) => > (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]())) > .productIterator.toList.tail > )) > } > > I take two random vertex id's: > > val v1 = 4000000028222916L > val v2 = 4000000031019012L > > and compute the path between them: > > val results = dijkstra(my_graph, v1).vertices.map(_._2).collect > > I am unable to compute this locally on my laptop without getting a > stackoverflow error. I have 8GB RAM and 2.6 GHz Intel Core i5 processor. I > can see that it is using 3 out of 4 cores available. I can load this graph > and compute shortest on average around 10 paths per second with the > `igraph` library in Python on exactly the same graph. Is this an > inefficient means of computing paths? At scale, on multiple nodes the paths > will compute (no stackoverflow error) but it is still 30/40seconds per path > computation. I must be missing something. > > Thanks > > [1]: https://www.dropbox.com/sh/9ug5ikr6j357q7j/AACDBR9UdM0g_ > ck_ykB8KXPXa?dl=0 >