this looks like https://issues.apache.org/jira/browse/SPARK-12655 <https://issues.apache.org/jira/browse/SPARK-12655> fixed in 2.0 ------------------------------------------------------------------------------- Robin East Spark GraphX in Action Michael Malak and Robin East Manning Publications Co. http://www.manning.com/books/spark-graphx-in-action <http://www.manning.com/books/spark-graphx-in-action>
> On 11 Apr 2016, at 07:23, zhang juntao <juntao.zhang...@gmail.com> wrote: > > thanks ted for replying , > these three lines can’t release param graph cache, it only release g ( > graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() ) > ConnectedComponents.scala param graph will cache in ccGraph and won’t be > release in Pregel > def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, > ED] = { > val ccGraph = graph.mapVertices { case (vid, _) => vid } > def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, > VertexId)] = { > if (edge.srcAttr < edge.dstAttr) { > Iterator((edge.dstId, edge.srcAttr)) > } else if (edge.srcAttr > edge.dstAttr) { > Iterator((edge.srcId, edge.dstAttr)) > } else { > Iterator.empty > } > } > val initialMessage = Long.MaxValue > Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)( > vprog = (id, attr, msg) => math.min(attr, msg), > sendMsg = sendMessage, > mergeMsg = (a, b) => math.min(a, b)) > } // end of connectedComponents > } > thanks > juntao > > >> Begin forwarded message: >> >> From: Ted Yu <yuzhih...@gmail.com <mailto:yuzhih...@gmail.com>> >> Subject: Re: spark graphx storage RDD memory leak >> Date: April 11, 2016 at 1:15:23 AM GMT+8 >> To: zhang juntao <juntao.zhang...@gmail.com >> <mailto:juntao.zhang...@gmail.com>> >> Cc: "dev@spark.apache.org <mailto:dev@spark.apache.org>" >> <dev@spark.apache.org <mailto:dev@spark.apache.org>> >> >> I see the following code toward the end of the method: >> >> // Unpersist the RDDs hidden by newly-materialized RDDs >> oldMessages.unpersist(blocking = false) >> prevG.unpersistVertices(blocking = false) >> prevG.edges.unpersist(blocking = false) >> >> Wouldn't the above achieve same effect ? >> >> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com >> <mailto:juntao.zhang...@gmail.com>> wrote: >> hi experts, >> >> I’m reporting a problem about spark graphx, I use zeppelin submit spark >> jobs, >> note that scala environment shares the same SparkContext, SQLContext >> instance, >> and I call Connected components algorithm to do some Business, >> found that every time when the job finished, some graph storage RDDs weren’t >> bean released, >> after several times there would be a lot of storage RDDs existing even >> through all the jobs have finished . >> >> <PastedGraphic-1.png> >> >> So I check the code of connectedComponents and find that may be a problem >> in Pregel.scala . >> when param graph has been cached, there isn’t any way to unpersist, >> so I add red font code to solve the problem >> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] >> (graph: Graph[VD, ED], >> initialMsg: A, >> maxIterations: Int = Int.MaxValue, >> activeDirection: EdgeDirection = EdgeDirection.Either) >> (vprog: (VertexId, VD, A) => VD, >> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], >> mergeMsg: (A, A) => A) >> : Graph[VD, ED] = >> { >> ...... >> var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, >> initialMsg)).cache() >> graph.unpersistVertices(blocking = false) >> graph.edges.unpersist(blocking = false) >> ...... >> >> } // end of apply >> >> I'm not sure if this is a bug, >> and thank you for your time, >> juntao >> >> >> >