yes I use version 1.6 , and thanks Ted 

> Begin forwarded message:
> 
> From: Robin East <robin.e...@xense.co.uk>
> Subject: Re: spark graphx storage RDD memory leak
> Date: April 12, 2016 at 2:13:10 AM GMT+8
> To: zhang juntao <juntao.zhang...@gmail.com>
> Cc: Ted Yu <yuzhih...@gmail.com>, dev@spark.apache.org
> 
> this looks like https://issues.apache.org/jira/browse/SPARK-12655 
> <https://issues.apache.org/jira/browse/SPARK-12655> fixed in 2.0
> -------------------------------------------------------------------------------
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action 
> <http://www.manning.com/books/spark-graphx-in-action>
> 
> 
> 
> 
> 
>> On 11 Apr 2016, at 07:23, zhang juntao <juntao.zhang...@gmail.com 
>> <mailto:juntao.zhang...@gmail.com>> wrote:
>> 
>> thanks ted for replying ,
>> these three lines can’t release param graph cache, it only release g ( 
>> graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
>> ConnectedComponents.scala param graph will cache in ccGraph and won’t be 
>> release in Pregel
>>   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, 
>> ED] = {
>>     val ccGraph = graph.mapVertices { case (vid, _) => vid }
>>     def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, 
>> VertexId)] = {
>>       if (edge.srcAttr < edge.dstAttr) {
>>         Iterator((edge.dstId, edge.srcAttr))
>>       } else if (edge.srcAttr > edge.dstAttr) {
>>         Iterator((edge.srcId, edge.dstAttr))
>>       } else {
>>         Iterator.empty
>>       }
>>     }
>>     val initialMessage = Long.MaxValue
>>     Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
>>       vprog = (id, attr, msg) => math.min(attr, msg),
>>       sendMsg = sendMessage,
>>       mergeMsg = (a, b) => math.min(a, b))
>>   } // end of connectedComponents
>> }
>> thanks
>> juntao
>> 
>> 
>>> Begin forwarded message:
>>> 
>>> From: Ted Yu <yuzhih...@gmail.com <mailto:yuzhih...@gmail.com>>
>>> Subject: Re: spark graphx storage RDD memory leak
>>> Date: April 11, 2016 at 1:15:23 AM GMT+8
>>> To: zhang juntao <juntao.zhang...@gmail.com 
>>> <mailto:juntao.zhang...@gmail.com>>
>>> Cc: "dev@spark.apache.org <mailto:dev@spark.apache.org>" 
>>> <dev@spark.apache.org <mailto:dev@spark.apache.org>>
>>> 
>>> I see the following code toward the end of the method:
>>> 
>>>       // Unpersist the RDDs hidden by newly-materialized RDDs
>>>       oldMessages.unpersist(blocking = false)
>>>       prevG.unpersistVertices(blocking = false)
>>>       prevG.edges.unpersist(blocking = false)
>>> 
>>> Wouldn't the above achieve same effect ?
>>> 
>>> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang...@gmail.com 
>>> <mailto:juntao.zhang...@gmail.com>> wrote:
>>> hi experts,
>>> 
>>> I’m reporting a problem about spark graphx, I use zeppelin submit spark 
>>> jobs, 
>>> note that scala environment shares the same SparkContext, SQLContext 
>>> instance,
>>> and I call  Connected components algorithm to do some Business,  
>>> found that every time when the job finished, some graph storage RDDs 
>>> weren’t bean released, 
>>> after several times there would be a lot of  storage RDDs existing even 
>>> through all the jobs have finished . 
>>> 
>>> <PastedGraphic-1.png>
>>> 
>>> So I check the code of connectedComponents  and find that may be a problem 
>>> in Pregel.scala .
>>> when param graph has been cached, there isn’t any way to unpersist,  
>>> so I add red font code to solve the problem
>>> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>>>    (graph: Graph[VD, ED],
>>>     initialMsg: A,
>>>     maxIterations: Int = Int.MaxValue,
>>>     activeDirection: EdgeDirection = EdgeDirection.Either)
>>>    (vprog: (VertexId, VD, A) => VD,
>>>     sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>>>     mergeMsg: (A, A) => A)
>>>   : Graph[VD, ED] =
>>> {
>>>   ......
>>>   var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, 
>>> initialMsg)).cache()
>>>   graph.unpersistVertices(blocking = false)
>>>   graph.edges.unpersist(blocking = false)
>>>   ......
>>> 
>>> } // end of apply
>>> 
>>> I'm not sure if this is a bug, 
>>> and thank you for your time,
>>> juntao
>>> 
>>> 
>>> 
>> 
> 

Reply via email to