i find them both somewhat confusing actually. * RDD.cache is lazy, and mutates the RDD in place * RDD.unpersist has a direct effect of unloading, and also mutates the RDD in place to disable future lazy caching
i have found that if i need to unload an RDD from memory, but still want it to be cached again in the future, i need to do: rdd.unpersist.cache On Tue, Feb 25, 2014 at 6:50 AM, Cheng Lian <rhythm.m...@gmail.com> wrote: > BTW, unlike RDD.cache(), the reverse operation RDD.unpersist() is not > lazy, which is somewhat confusing... > > > On Tue, Feb 25, 2014 at 7:48 PM, Cheng Lian <rhythm.m...@gmail.com> wrote: > >> RDD.cache() is a lazy operation, the method itself doesn't perform the >> cache operation, it just asks Spark runtime to cache the content of the RDD >> when the first action is invoked. In your case, the first action is the >> first count() call, which conceptually does 3 things: >> >> 1. Performs the HBase scan >> 2. Counts all the element >> 3. Caches the RDD content >> >> >> >> On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar <kumar.soumi...@gmail.com >> > wrote: >> >>> I did try with 'hBaseRDD.cache()', but don't see any improvement. >>> >>> My expectation is that with cache enabled, there should not be any >>> penalty of 'hBaseRDD.count' call. >>> >>> >>> >>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath < >>> nick.pentre...@gmail.com> wrote: >>> >>>> Yes, you''re initiating a scan for each count call. The normal way to >>>> improve this would be to use cache(), which is what you have in your >>>> commented out line: >>>> // hBaseRDD.cache() >>>> >>>> If you uncomment that line, you should see an improvement overall. >>>> >>>> If caching is not an option for some reason (maybe data is too large), >>>> then you can implement an overall count in your readFields method using >>>> accumulators: >>>> >>>> val count = sc.accumulator(0L) >>>> ... >>>> In your flatMap function do count += 1 for each row (regardless of >>>> whether "interesting" or not). >>>> >>>> In your main method after doing an action (e.g. count in your case), >>>> call val totalCount = count.value. >>>> >>>> >>>> >>>> >>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar < >>>> kumar.soumi...@gmail.com> wrote: >>>> >>>>> I have a code which reads an HBase table, and counts number of rows >>>>> containing a field. >>>>> >>>>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) : >>>>> RDD[List[Array[Byte]]] = { >>>>> return rdd.flatMap(kv => { >>>>> // Set of interesting keys for this use case >>>>> val keys = List ("src") >>>>> var data = List[Array[Byte]]() >>>>> var usefulRow = false >>>>> >>>>> val cf = Bytes.toBytes ("cf") >>>>> keys.foreach {key => >>>>> val col = kv._2.getValue(cf, Bytes.toBytes(key)) >>>>> if (col != null) >>>>> usefulRow = true >>>>> data = data :+ col >>>>> } >>>>> >>>>> if (usefulRow) >>>>> Some(data) >>>>> else >>>>> None >>>>> }) >>>>> } >>>>> >>>>> def main(args: Array[String]) { >>>>> val hBaseRDD = init(args) >>>>> // hBaseRDD.cache() >>>>> >>>>> println("**** Initial row count " + hBaseRDD.count()) >>>>> println("**** Rows with interesting fields " + >>>>> readFields(hBaseRDD).count()) >>>>> } >>>>> >>>>> >>>>> I am running on a one mode CDH installation. >>>>> >>>>> As it is it takes around 2.5 minutes. But if I comment out >>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around >>>>> 1.5 minutes. >>>>> >>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve >>>>> it? >>>>> >>>>> Thanks, >>>>> -Soumitra. >>>>> >>>> >>>> >>> >> >