i find them both somewhat confusing actually.
* RDD.cache is lazy, and mutates the RDD in place
* RDD.unpersist has a direct effect of unloading, and also mutates the RDD
in place to disable future lazy caching

i have found that if i need to unload an RDD from memory, but still want it
to be cached again in the future, i need to do:
rdd.unpersist.cache






On Tue, Feb 25, 2014 at 6:50 AM, Cheng Lian <rhythm.m...@gmail.com> wrote:

> BTW, unlike RDD.cache(), the reverse operation RDD.unpersist() is not
> lazy, which is somewhat confusing...
>
>
> On Tue, Feb 25, 2014 at 7:48 PM, Cheng Lian <rhythm.m...@gmail.com> wrote:
>
>> RDD.cache() is a lazy operation, the method itself doesn't perform the
>> cache operation, it just asks Spark runtime to cache the content of the RDD
>> when the first action is invoked.  In your case, the first action is the
>> first count() call, which conceptually does 3 things:
>>
>>    1. Performs the HBase scan
>>    2. Counts all the element
>>    3. Caches the RDD content
>>
>>
>>
>> On Tue, Feb 25, 2014 at 3:38 PM, Soumitra Kumar <kumar.soumi...@gmail.com
>> > wrote:
>>
>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>
>>> My expectation is that with cache enabled, there should not be any
>>> penalty of 'hBaseRDD.count' call.
>>>
>>>
>>>
>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
>>>> Yes, you''re initiating a scan for each count call. The normal way to
>>>> improve this would be to use cache(), which is what you have in your
>>>> commented out line:
>>>> // hBaseRDD.cache()
>>>>
>>>> If you uncomment that line, you should see an improvement overall.
>>>>
>>>> If caching is not an option for some reason (maybe data is too large),
>>>> then you can implement an overall count in your readFields method using
>>>> accumulators:
>>>>
>>>> val count = sc.accumulator(0L)
>>>> ...
>>>> In your flatMap function do count += 1 for each row (regardless of
>>>> whether "interesting" or not).
>>>>
>>>> In your main method after doing an action (e.g. count in your case),
>>>> call val totalCount = count.value.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>>> kumar.soumi...@gmail.com> wrote:
>>>>
>>>>> I have a code which reads an HBase table, and counts number of rows
>>>>> containing a field.
>>>>>
>>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>>> RDD[List[Array[Byte]]] = {
>>>>>         return rdd.flatMap(kv => {
>>>>>             // Set of interesting keys for this use case
>>>>>             val keys = List ("src")
>>>>>             var data = List[Array[Byte]]()
>>>>>             var usefulRow = false
>>>>>
>>>>>             val cf = Bytes.toBytes ("cf")
>>>>>             keys.foreach {key =>
>>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>>                 if (col != null)
>>>>>                     usefulRow = true
>>>>>                 data = data :+ col
>>>>>             }
>>>>>
>>>>>             if (usefulRow)
>>>>>                 Some(data)
>>>>>             else
>>>>>                 None
>>>>>         })
>>>>>     }
>>>>>
>>>>>     def main(args: Array[String]) {
>>>>>         val hBaseRDD = init(args)
>>>>>         // hBaseRDD.cache()
>>>>>
>>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>>         println("**** Rows with interesting fields " +
>>>>> readFields(hBaseRDD).count())
>>>>>   }
>>>>>
>>>>>
>>>>> I am running on a one mode CDH installation.
>>>>>
>>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>>> 1.5 minutes.
>>>>>
>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>>>> it?
>>>>>
>>>>> Thanks,
>>>>> -Soumitra.
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to