foreachRDD is the underlying function that is used behind print and
saveAsTextFiles. If you are curious, here is the actual source
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L759


Yes, since the RDD is used twice, its probably best to call rdd.cache /
rdd.persist. In fact, you can do it at the DStream level.
MyDStream.persist() will ensure that all RDDs of the MyDStream will be
automatically cached/persisted. However, sometimes when one of operations
is cheap enough (RDD.take(1) is probably cheap), so you can sometimes
reduce memory usage by not persisting, at a cost of slightly increased
computation times.

TD


On Wed, Mar 26, 2014 at 12:18 PM, Diana Carroll <dcarr...@cloudera.com>wrote:

> Thanks, Tagatha, very helpful.  A follow-up question below...
>
>
> On Wed, Mar 26, 2014 at 2:27 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>>
>>
>> *Answer 3:*You can do something like
>> wordCounts.foreachRDD((rdd: RDD[...], time: Time) => {
>>    if (rdd.take(1).size == 1) {
>>       // There exists at least one element in RDD, so save it to file
>>       rdd.saveAsTextFile(<generate file name based on time>)
>>    }
>> }
>>
>> Is calling foreachRDD and performing an operation on each individually as
> efficient as performing the operation on the dstream?  Is this foreach
> pretty much what dstream.saveAsTextFiles is doing anyway?
>
> This also brings up a question I have about caching in the context of
> streaming.  In  this example, would I want to call rdd.cache()?  I'm
> calling two successive operations on the same rdd (take(1) and then
> saveAsTextFile))...if I were doing this in regular Spark I'd want to cache
> so I wouldn't need to re-calculate the rdd for both calls.  Does the same
> apply here?
>
> Thanks,
> Diana
>

Reply via email to