Yes, you''re initiating a scan for each count call. The normal way to improve this would be to use cache(), which is what you have in your commented out line: // hBaseRDD.cache()
If you uncomment that line, you should see an improvement overall. If caching is not an option for some reason (maybe data is too large), then you can implement an overall count in your readFields method using accumulators: val count = sc.accumulator(0L) ... In your flatMap function do count += 1 for each row (regardless of whether "interesting" or not). In your main method after doing an action (e.g. count in your case), call val totalCount = count.value. On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <kumar.soumi...@gmail.com>wrote: > I have a code which reads an HBase table, and counts number of rows > containing a field. > > def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) : > RDD[List[Array[Byte]]] = { > return rdd.flatMap(kv => { > // Set of interesting keys for this use case > val keys = List ("src") > var data = List[Array[Byte]]() > var usefulRow = false > > val cf = Bytes.toBytes ("cf") > keys.foreach {key => > val col = kv._2.getValue(cf, Bytes.toBytes(key)) > if (col != null) > usefulRow = true > data = data :+ col > } > > if (usefulRow) > Some(data) > else > None > }) > } > > def main(args: Array[String]) { > val hBaseRDD = init(args) > // hBaseRDD.cache() > > println("**** Initial row count " + hBaseRDD.count()) > println("**** Rows with interesting fields " + > readFields(hBaseRDD).count()) > } > > > I am running on a one mode CDH installation. > > As it is it takes around 2.5 minutes. But if I comment out 'println("**** > Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes. > > Is it doing HBase scan twice, for both 'count' calls? How do I improve it? > > Thanks, > -Soumitra. >