thanks ted for replying ,
these three lines can’t release param graph cache, it only release g ( 
graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
ConnectedComponents.scala param graph will cache in ccGraph and won’t be 
release in Pregel
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, 
ED] = {
    val ccGraph = graph.mapVertices { case (vid, _) => vid }
    def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, 
VertexId)] = {
      if (edge.srcAttr < edge.dstAttr) {
        Iterator((edge.dstId, edge.srcAttr))
      } else if (edge.srcAttr > edge.dstAttr) {
        Iterator((edge.srcId, edge.dstAttr))
      } else {
        Iterator.empty
      }
    }
    val initialMessage = Long.MaxValue
    Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
      vprog = (id, attr, msg) => math.min(attr, msg),
      sendMsg = sendMessage,
      mergeMsg = (a, b) => math.min(a, b))
  } // end of connectedComponents
}
thanks
juntao


> Begin forwarded message:
> 
> From: Ted Yu <yuzhih...@gmail.com>
> Subject: Re: spark graphx storage RDD memory leak
> Date: April 11, 2016 at 1:15:23 AM GMT+8
> To: zhang juntao <juntao.zhang...@gmail.com>
> Cc: "dev@spark.apache.org" <dev@spark.apache.org>
> 
> I see the following code toward the end of the method:
> 
>       // Unpersist the RDDs hidden by newly-materialized RDDs
>       oldMessages.unpersist(blocking = false)
>       prevG.unpersistVertices(blocking = false)
>       prevG.edges.unpersist(blocking = false)
> 
> Wouldn't the above achieve same effect ?
> 
> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com 
> <mailto:juntao.zhang...@gmail.com>> wrote:
> hi experts,
> 
> I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, 
> note that scala environment shares the same SparkContext, SQLContext instance,
> and I call  Connected components algorithm to do some Business,  
> found that every time when the job finished, some graph storage RDDs weren’t 
> bean released, 
> after several times there would be a lot of  storage RDDs existing even 
> through all the jobs have finished . 
> 
> 
> 
> So I check the code of connectedComponents  and find that may be a problem in 
> Pregel.scala .
> when param graph has been cached, there isn’t any way to unpersist,  
> so I add red font code to solve the problem
> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>    (graph: Graph[VD, ED],
>     initialMsg: A,
>     maxIterations: Int = Int.MaxValue,
>     activeDirection: EdgeDirection = EdgeDirection.Either)
>    (vprog: (VertexId, VD, A) => VD,
>     sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>     mergeMsg: (A, A) => A)
>   : Graph[VD, ED] =
> {
>   ......
>   var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
> initialMsg)).cache()
>   graph.unpersistVertices(blocking = false)
>   graph.edges.unpersist(blocking = false)
>   ......
> 
> } // end of apply
> 
> I'm not sure if this is a bug, 
> and thank you for your time,
> juntao
> 
> 
> 

Reply via email to