Hi Gerard,
How are you starting spark? Are you allocating enough RAM for processing? I
think the default is 512mb.  Try to doing the following and see if it helps
(based on the size of your dataset, you might not need all 8gb).

$SPARK_HOME/bin/spark-shell \
  --master local[4] \
  --executor-memory 8G \
  --driver-memory 8G



Thank You,

Irving Duran

On Tue, Jan 10, 2017 at 12:20 PM, Gerard Casey <gerardhughca...@gmail.com>
wrote:

> Hello everyone,
>
> I am creating a graph from a `gz` compressed `json` file of `edge` and
> `vertices` type.
>
> I have put the files in a dropbox folder [here][1]
>
> I load and map these `json` records to create the `vertices` and `edge`
> types required by `graphx` like this:
>
>     val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
>     val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").
> stripPrefix("osgb").toLong),row.getAs[Long]("index")))
>     val verticesRDD: RDD[(VertexId, Long)] = vertices
>     val edges_raw = sqlContext.read.json("path/edges.json.gz")
>     val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("
> positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("
> negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))
>     val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD,
> edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)
>
> I then use this `dijkstra` implementation I found to compute a shortest
> path between two vertices:
>
>     def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
>               var g2 = g.mapVertices(
>             (vid, vd) => (false, if (vid == origin) 0 else
> Double.MaxValue, List[VertexId]())
>               )
>               for (i <- 1L to g.vertices.count - 1) {
>                 val currentVertexId: VertexId =
> g2.vertices.filter(!_._2._1)
>                   .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
>                     (a, b) => if (a._2._2 < b._2._2) a else b)
>                   ._1
>
>                 val newDistances: VertexRDD[(Double, List[VertexId])] =
>                   g2.aggregateMessages[(Double, List[VertexId])](
>                 ctx => if (ctx.srcId == currentVertexId) {
>                   ctx.sendToDst((ctx.srcAttr._2 + ctx.attr, ctx.srcAttr._3
> :+ ctx.srcId))
>                 },
>                 (a, b) => if (a._1 < b._1) a else b
>               )
>             g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
>               val newSumVal = newSum.getOrElse((Double.MaxValue,
> List[VertexId]()))
>               (
>                 vd._1 || vid == currentVertexId,
>                 math.min(vd._2, newSumVal._1),
>                 if (vd._2 < newSumVal._1) vd._3 else newSumVal._2
>                 )
>             })
>             }
>
>               g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
>             (vd, dist.getOrElse((false, Double.MaxValue, List[VertexId]()))
>               .productIterator.toList.tail
>               ))
>             }
>
> I take two random vertex id's:
>
>     val v1 = 4000000028222916L
>     val v2 = 4000000031019012L
>
> and compute the path between them:
>
>     val results = dijkstra(my_graph, v1).vertices.map(_._2).collect
>
> I am unable to compute this locally on my laptop without getting a
> stackoverflow error. I have 8GB RAM and 2.6 GHz Intel Core i5 processor. I
> can see that it is using 3 out of 4 cores available. I can load this graph
> and compute shortest on average around 10 paths per second with the
> `igraph` library in Python on exactly the same graph. Is this an
> inefficient means of computing paths? At scale, on multiple nodes the paths
> will compute (no stackoverflow error) but it is still 30/40seconds per path
> computation. I must be missing something.
>
> Thanks
>
>   [1]: https://www.dropbox.com/sh/9ug5ikr6j357q7j/AACDBR9UdM0g_
> ck_ykB8KXPXa?dl=0
>

Reply via email to