Hi Burak,

Looking at the source code, the intermediate RDDs used in ALS.train() are 
persisted during the computation using intermediateRDDStorageLevel (default 
value is StorageLevel.MEMORY_AND_DISK) - see 
here<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L546>,
 
here<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L548>,
 and 
here<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L556>.
  At the end of the ALS calculation, these RDDs are no longer needed nor 
returned, so I would assume the logical choice would be to unpersist() these 
RDDs.  The strategy in the code seems to be set by finalRDDStorageLevel, which 
for some reason only calls unpersist() on the intermediate RDDs if  
finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me.

Jonathan

From: Burak Yavuz <brk...@gmail.com<mailto:brk...@gmail.com>>
Date: Wednesday, July 22, 2015 at 10:47 AM
To: Stahlman Jonathan 
<jonathan.stahl...@capitalone.com<mailto:jonathan.stahl...@capitalone.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hi Jonathan,

I believe calling persist with StorageLevel.NONE doesn't do anything. That's 
why the unpersist has an if statement before it.
Could you give more information about your setup please? Number of cores, 
memory, number of partitions of ratings_train?

Thanks,
Burak

On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan 
<jonathan.stahl...@capitalone.com<mailto:jonathan.stahl...@capitalone.com>> 
wrote:
Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: <Stahlman>, Stahlman Jonathan 
<jonathan.stahl...@capitalone.com<mailto:jonathan.stahl...@capitalone.com>>
Date: Thursday, July 16, 2015 at 2:18 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
    #train model
    ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
    model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

    #test performance on CV data
    ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
    auc = areaUnderCurve( ratings_cv, model.predictAll )

    #save results
    result = ",".join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
    results.append(result)

________________________________

The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.

Reply via email to