thanks ted for replying , these three lines can’t release param graph cache, it only release g ( graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() ) ConnectedComponents.scala param graph will cache in ccGraph and won’t be release in Pregel def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = { val ccGraph = graph.mapVertices { case (vid, _) => vid } def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = { if (edge.srcAttr < edge.dstAttr) { Iterator((edge.dstId, edge.srcAttr)) } else if (edge.srcAttr > edge.dstAttr) { Iterator((edge.srcId, edge.dstAttr)) } else { Iterator.empty } } val initialMessage = Long.MaxValue Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( vprog = (id, attr, msg) => math.min(attr, msg), sendMsg = sendMessage, mergeMsg = (a, b) => math.min(a, b)) } // end of connectedComponents } thanks juntao
> Begin forwarded message: > > From: Ted Yu <yuzhih...@gmail.com> > Subject: Re: spark graphx storage RDD memory leak > Date: April 11, 2016 at 1:15:23 AM GMT+8 > To: zhang juntao <juntao.zhang...@gmail.com> > Cc: "dev@spark.apache.org" <dev@spark.apache.org> > > I see the following code toward the end of the method: > > // Unpersist the RDDs hidden by newly-materialized RDDs > oldMessages.unpersist(blocking = false) > prevG.unpersistVertices(blocking = false) > prevG.edges.unpersist(blocking = false) > > Wouldn't the above achieve same effect ? > > On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com > <mailto:juntao.zhang...@gmail.com>> wrote: > hi experts, > > I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, > note that scala environment shares the same SparkContext, SQLContext instance, > and I call Connected components algorithm to do some Business, > found that every time when the job finished, some graph storage RDDs weren’t > bean released, > after several times there would be a lot of storage RDDs existing even > through all the jobs have finished . > > > > So I check the code of connectedComponents and find that may be a problem in > Pregel.scala . > when param graph has been cached, there isn’t any way to unpersist, > so I add red font code to solve the problem > def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] > (graph: Graph[VD, ED], > initialMsg: A, > maxIterations: Int = Int.MaxValue, > activeDirection: EdgeDirection = EdgeDirection.Either) > (vprog: (VertexId, VD, A) => VD, > sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], > mergeMsg: (A, A) => A) > : Graph[VD, ED] = > { > ...... > var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, > initialMsg)).cache() > graph.unpersistVertices(blocking = false) > graph.edges.unpersist(blocking = false) > ...... > > } // end of apply > > I'm not sure if this is a bug, > and thank you for your time, > juntao > > >