I have a code which reads an HBase table, and counts number of rows
containing a field.
def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
RDD[List[Array[Byte]]] = {
return rdd.flatMap(kv => {
// Set of interesting keys for this use case
val keys = List ("src")
var data = List[Array[Byte]]()
var usefulRow = false
val cf = Bytes.toBytes ("cf")
keys.foreach {key =>
val col = kv._2.getValue(cf, Bytes.toBytes(key))
if (col != null)
usefulRow = true
data = data :+ col
}
if (usefulRow)
Some(data)
else
None
})
}
def main(args: Array[String]) {
val hBaseRDD = init(args)
// hBaseRDD.cache()
println("**** Initial row count " + hBaseRDD.count())
println("**** Rows with interesting fields " +
readFields(hBaseRDD).count())
}
I am running on a one mode CDH installation.
As it is it takes around 2.5 minutes. But if I comment out 'println("****
Initial row count " + hBaseRDD.count())', it takes around 1.5 minutes.
Is it doing HBase scan twice, for both 'count' calls? How do I improve it?
Thanks,
-Soumitra.