foreachRDD is the underlying function that is used behind print and saveAsTextFiles. If you are curious, here is the actual source https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L759
Yes, since the RDD is used twice, its probably best to call rdd.cache / rdd.persist. In fact, you can do it at the DStream level. MyDStream.persist() will ensure that all RDDs of the MyDStream will be automatically cached/persisted. However, sometimes when one of operations is cheap enough (RDD.take(1) is probably cheap), so you can sometimes reduce memory usage by not persisting, at a cost of slightly increased computation times. TD On Wed, Mar 26, 2014 at 12:18 PM, Diana Carroll <dcarr...@cloudera.com>wrote: > Thanks, Tagatha, very helpful. A follow-up question below... > > > On Wed, Mar 26, 2014 at 2:27 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> >> >> *Answer 3:*You can do something like >> wordCounts.foreachRDD((rdd: RDD[...], time: Time) => { >> if (rdd.take(1).size == 1) { >> // There exists at least one element in RDD, so save it to file >> rdd.saveAsTextFile(<generate file name based on time>) >> } >> } >> >> Is calling foreachRDD and performing an operation on each individually as > efficient as performing the operation on the dstream? Is this foreach > pretty much what dstream.saveAsTextFiles is doing anyway? > > This also brings up a question I have about caching in the context of > streaming. In this example, would I want to call rdd.cache()? I'm > calling two successive operations on the same rdd (take(1) and then > saveAsTextFile))...if I were doing this in regular Spark I'd want to cache > so I wouldn't need to re-calculate the rdd for both calls. Does the same > apply here? > > Thanks, > Diana >