BTW, unlike RDD.cache(), the reverse operation RDD.unpersist() is not lazy, which is somewhat confusing...
On Tue, Feb 25, 2014 at 7:48 PM, Cheng Lian <rhythm.m...@gmail.com> wrote: > RDD.cache() is a lazy operation, the method itself doesn't perform the > cache operation, it just asks Spark runtime to cache the content of the RDD > when the first action is invoked. In your case, the first action is the > first count() call, which conceptually does 3 things: > > 1. Performs the HBase scan > 2. Counts all the element > 3. Caches the RDD content > > > > On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar > <kumar.soumi...@gmail.com>wrote: > >> I did try with 'hBaseRDD.cache()', but don't see any improvement. >> >> My expectation is that with cache enabled, there should not be any >> penalty of 'hBaseRDD.count' call. >> >> >> >> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath < >> nick.pentre...@gmail.com> wrote: >> >>> Yes, you''re initiating a scan for each count call. The normal way to >>> improve this would be to use cache(), which is what you have in your >>> commented out line: >>> // hBaseRDD.cache() >>> >>> If you uncomment that line, you should see an improvement overall. >>> >>> If caching is not an option for some reason (maybe data is too large), >>> then you can implement an overall count in your readFields method using >>> accumulators: >>> >>> val count = sc.accumulator(0L) >>> ... >>> In your flatMap function do count += 1 for each row (regardless of >>> whether "interesting" or not). >>> >>> In your main method after doing an action (e.g. count in your case), >>> call val totalCount = count.value. >>> >>> >>> >>> >>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar < >>> kumar.soumi...@gmail.com> wrote: >>> >>>> I have a code which reads an HBase table, and counts number of rows >>>> containing a field. >>>> >>>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) : >>>> RDD[List[Array[Byte]]] = { >>>> return rdd.flatMap(kv => { >>>> // Set of interesting keys for this use case >>>> val keys = List ("src") >>>> var data = List[Array[Byte]]() >>>> var usefulRow = false >>>> >>>> val cf = Bytes.toBytes ("cf") >>>> keys.foreach {key => >>>> val col = kv._2.getValue(cf, Bytes.toBytes(key)) >>>> if (col != null) >>>> usefulRow = true >>>> data = data :+ col >>>> } >>>> >>>> if (usefulRow) >>>> Some(data) >>>> else >>>> None >>>> }) >>>> } >>>> >>>> def main(args: Array[String]) { >>>> val hBaseRDD = init(args) >>>> // hBaseRDD.cache() >>>> >>>> println("**** Initial row count " + hBaseRDD.count()) >>>> println("**** Rows with interesting fields " + >>>> readFields(hBaseRDD).count()) >>>> } >>>> >>>> >>>> I am running on a one mode CDH installation. >>>> >>>> As it is it takes around 2.5 minutes. But if I comment out >>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around >>>> 1.5 minutes. >>>> >>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve >>>> it? >>>> >>>> Thanks, >>>> -Soumitra. >>>> >>> >>> >> >