Hi,
I tried quick and simple tests though, ISTM the vertices below were
correctly cached.
Could you give me the differences between my codes and yours?
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
object Prog {
def processInt(d: Int) = d * 2
}
val g = GraphLoader.edgeListFile(sc, "../temp/graph.txt")
.cache
val g2 = g.outerJoinVertices(g.degrees)(
(vid, old, msg) => Prog.processInt(msg.getOrElse(0)))
.cache
g2.vertices.count
val g3 = g.outerJoinVertices(g.degrees)(
(vid, old, msg) => msg.getOrElse(0))
.mapVertices((vid, d) => Prog.processInt(d))
.cache
g3.vertices.count
'g2.vertices.toDebugString' outputs;
----
(2) VertexRDDImpl[16] at RDD at VertexRDD.scala:57 []
| VertexRDD ZippedPartitionsRDD2[15] at zipPartitions at
VertexRDDImpl.scala:121 []
| CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
| VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
| CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
| MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
| ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
+-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:330 []
| GraphLoader.edgeListFile - edges (../temp/graph.txt), EdgeRDD,
EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at Graph...
'g3.vertices.toDebugString' outputs;
----
(2) VertexRDDImpl[33] at RDD at VertexRDD.scala:57 []
| VertexRDD MapPartitionsRDD[32] at mapPartitions at
VertexRDDImpl.scala:96 []
| CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
| VertexRDD ZippedPartitionsRDD2[24] at zipPartitions at
VertexRDDImpl.scala:121 []
| CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
| VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
| CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
| MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
| ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
+-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPar...
-- maropu
On Mon, Feb 9, 2015 at 5:47 AM, Kyle Ellrott <[email protected]> wrote:
> I changed the
>
> curGraph = curGraph.outerJoinVertices(curMessages)(
> (vid, vertex, message) =>
> vertex.process(message.getOrElse(List[Message]()), ti)
> ).cache()
>
> to
>
> curGraph = curGraph.outerJoinVertices(curMessages)(
> (vid, vertex, message) => (vertex,
> message.getOrElse(List[Message]()))
> ).mapVertices( (x,y) => y._1.process( y._2, ti ) ).cache()
>
> So the call to the 'process' method was moved out of the outerJoinVertices
> and into a separate mapVertices call, and the problem went away. Now,
> 'process' is only called once during the correct cycle.
> So it would appear that outerJoinVertices caches the closure to be
> recalculated if needed again while mapVertices actually caches the
> derived values.
>
> Is this a bug or a feature?
>
> Kyle
>
>
>
> On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott <[email protected]>
> wrote:
>
>> I'm trying to setup a simple iterative message/update problem in GraphX
>> (spark 1.2.0), but I'm running into issues with the caching and
>> re-calculation of data. I'm trying to follow the example found in the
>> Pregel implementation of materializing and cacheing messages and graphs and
>> then unpersisting them after the next cycle has been done.
>> It doesn't seem to be working, because every cycle gets progressively
>> slower and it seems as if more and more of the values are being
>> re-calculated despite my attempts to cache them.
>>
>> The code:
>> ```
>> var oldMessages : VertexRDD[List[Message]] = null
>> var oldGraph : Graph[MyVertex, MyEdge ] = null
>> curGraph = curGraph.mapVertices((x, y) => y.init())
>> for (i <- 0 to cycle_count) {
>> val curMessages = curGraph.aggregateMessages[List[Message]](x => {
>> //send messages
>> .....
>> },
>> (x, y) => {
>> //collect messages into lists
>> val out = x ++ y
>> out
>> }
>> ).cache()
>> curMessages.count()
>> val ti = i
>> oldGraph = curGraph
>> curGraph = curGraph.outerJoinVertices(curMessages)(
>> (vid, vertex, message) =>
>> vertex.process(message.getOrElse(List[Message]()), ti)
>> ).cache()
>> curGraph.vertices.count()
>> oldGraph.unpersistVertices(blocking = false)
>> oldGraph.edges.unpersist(blocking = false)
>> oldGraph = curGraph
>> if (oldMessages != null ) {
>> oldMessages.unpersist(blocking=false)
>> }
>> oldMessages = curMessages
>> }
>> ```
>>
>> The MyVertex.process method takes the list of incoming messages, averages
>> them and returns a new MyVertex object. I've also set it up to append the
>> cycle number (the second argument) into a log file named after the vertex.
>> What ends up getting dumped into the log file for every vertex (in the
>> exact same pattern) is
>> ```
>> Cycle: 0
>> Cycle: 1
>> Cycle: 0
>> Cycle: 2
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 3
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 2
>> Cycle: 4
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 2
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 2
>> Cycle: 3
>> Cycle: 5
>> ```
>>
>> Any ideas about what I might be doing wrong for the caching? And how I
>> can avoid re-calculating so many of the values.
>>
>>
>> Kyle
>>
>>
>>
>
--
---
Takeshi Yamamuro