BTW, unlike RDD.cache(), the reverse operation RDD.unpersist() is not lazy,
which is somewhat confusing...


On Tue, Feb 25, 2014 at 7:48 PM, Cheng Lian <rhythm.m...@gmail.com> wrote:

> RDD.cache() is a lazy operation, the method itself doesn't perform the
> cache operation, it just asks Spark runtime to cache the content of the RDD
> when the first action is invoked.  In your case, the first action is the
> first count() call, which conceptually does 3 things:
>
>    1. Performs the HBase scan
>    2. Counts all the element
>    3. Caches the RDD content
>
>
>
> On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar 
> <kumar.soumi...@gmail.com>wrote:
>
>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>
>> My expectation is that with cache enabled, there should not be any
>> penalty of 'hBaseRDD.count' call.
>>
>>
>>
>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> Yes, you''re initiating a scan for each count call. The normal way to
>>> improve this would be to use cache(), which is what you have in your
>>> commented out line:
>>> // hBaseRDD.cache()
>>>
>>> If you uncomment that line, you should see an improvement overall.
>>>
>>> If caching is not an option for some reason (maybe data is too large),
>>> then you can implement an overall count in your readFields method using
>>> accumulators:
>>>
>>> val count = sc.accumulator(0L)
>>> ...
>>> In your flatMap function do count += 1 for each row (regardless of
>>> whether "interesting" or not).
>>>
>>> In your main method after doing an action (e.g. count in your case),
>>> call val totalCount = count.value.
>>>
>>>
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>> kumar.soumi...@gmail.com> wrote:
>>>
>>>> I have a code which reads an HBase table, and counts number of rows
>>>> containing a field.
>>>>
>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>> RDD[List[Array[Byte]]] = {
>>>>         return rdd.flatMap(kv => {
>>>>             // Set of interesting keys for this use case
>>>>             val keys = List ("src")
>>>>             var data = List[Array[Byte]]()
>>>>             var usefulRow = false
>>>>
>>>>             val cf = Bytes.toBytes ("cf")
>>>>             keys.foreach {key =>
>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>                 if (col != null)
>>>>                     usefulRow = true
>>>>                 data = data :+ col
>>>>             }
>>>>
>>>>             if (usefulRow)
>>>>                 Some(data)
>>>>             else
>>>>                 None
>>>>         })
>>>>     }
>>>>
>>>>     def main(args: Array[String]) {
>>>>         val hBaseRDD = init(args)
>>>>         // hBaseRDD.cache()
>>>>
>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>         println("**** Rows with interesting fields " +
>>>> readFields(hBaseRDD).count())
>>>>   }
>>>>
>>>>
>>>> I am running on a one mode CDH installation.
>>>>
>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>> 1.5 minutes.
>>>>
>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>>> it?
>>>>
>>>> Thanks,
>>>> -Soumitra.
>>>>
>>>
>>>
>>
>

Reply via email to