cache only caches the data on the first action (count) - the first time it still needs to read the data from the source. So the first time you call count it will take the same amount of time whether cache is enabled or not. The second time you call count on a cached RDD, you should see that it takes a lot less time (assuming that the data fit in memory).
On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <kumar.soumi...@gmail.com>wrote: > I did try with 'hBaseRDD.cache()', but don't see any improvement. > > My expectation is that with cache enabled, there should not be any penalty > of 'hBaseRDD.count' call. > > > > On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <nick.pentre...@gmail.com > > wrote: > >> Yes, you''re initiating a scan for each count call. The normal way to >> improve this would be to use cache(), which is what you have in your >> commented out line: >> // hBaseRDD.cache() >> >> If you uncomment that line, you should see an improvement overall. >> >> If caching is not an option for some reason (maybe data is too large), >> then you can implement an overall count in your readFields method using >> accumulators: >> >> val count = sc.accumulator(0L) >> ... >> In your flatMap function do count += 1 for each row (regardless of >> whether "interesting" or not). >> >> In your main method after doing an action (e.g. count in your case), call val >> totalCount = count.value. >> >> >> >> >> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <kumar.soumi...@gmail.com >> > wrote: >> >>> I have a code which reads an HBase table, and counts number of rows >>> containing a field. >>> >>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) : >>> RDD[List[Array[Byte]]] = { >>> return rdd.flatMap(kv => { >>> // Set of interesting keys for this use case >>> val keys = List ("src") >>> var data = List[Array[Byte]]() >>> var usefulRow = false >>> >>> val cf = Bytes.toBytes ("cf") >>> keys.foreach {key => >>> val col = kv._2.getValue(cf, Bytes.toBytes(key)) >>> if (col != null) >>> usefulRow = true >>> data = data :+ col >>> } >>> >>> if (usefulRow) >>> Some(data) >>> else >>> None >>> }) >>> } >>> >>> def main(args: Array[String]) { >>> val hBaseRDD = init(args) >>> // hBaseRDD.cache() >>> >>> println("**** Initial row count " + hBaseRDD.count()) >>> println("**** Rows with interesting fields " + >>> readFields(hBaseRDD).count()) >>> } >>> >>> >>> I am running on a one mode CDH installation. >>> >>> As it is it takes around 2.5 minutes. But if I comment out >>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around >>> 1.5 minutes. >>> >>> Is it doing HBase scan twice, for both 'count' calls? How do I improve >>> it? >>> >>> Thanks, >>> -Soumitra. >>> >> >> >