Hi TD, Thanks a lot for the comprehensive answer.
I think this explanation deserves some place in the Spark Streaming tuning guide. -kr, Gerard. On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das <tathagata.das1...@gmail.com > wrote: > Hey Gerard, > > This is a very good question! > > *TL;DR: *The performance should be same, except in case of shuffle-based > operations where the number of reducers is not explicitly specified. > > Let me answer in more detail by dividing the set of DStream operations > into three categories. > > *1. Map-like operations (map, flatmap, filter, etc.) that does not involve > any shuffling of data:* Performance should virtually be the same in both > cases. Either ways, in each batch, the operations on the batch's RDD are > first set on the driver, and then the actions like on the RDD are executed. > There are very very minor differences in the two cases of early foreachRDD > and late foreachRDD (e.x, cleaning up for function closures, etc.) but > those should make almost not difference in the performance. > > *2. Operations involving shuffle: *Here is there is a subtle difference > in both cases if the number of partitions is not specified. The default > number of partitions used when using dstream.reduceByKey() and than when > using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to > play around with the number of reducers to see what performs better. But if > the number of reducers is explicitly specified and is the same both cases, > then the performance should be similar. Note that this difference in the > default numbers are not guaranteed to be like this, it could change in > future implementations. > > *3. Aggregation-like operations (count, reduce): *Here there is another > subtle execution difference between > - dstream.count() which produces a DStream of single-element RDDs, the > element being the count, and > - dstream.foreachRDD(_.count()) which returns the count directly. > > In the first case, some random worker node is chosen for the reduce, in > another the driver is chosen for the reduce. There should not be a > significant performance difference. > > *4. Other operations* including window ops and stateful ops > (updateStateByKey), are obviously not part of the discussion as they cannot > be (easily) done through early foreachRDD. > > Hope this helps! > > TD > > PS: Sorry for not noticing this question earlier. > > On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas <gerard.m...@gmail.com> > wrote: > >> PS: Just to clarify my statement: >> >> >>Unlike the feared RDD operations on the driver, it's my understanding >> that these Dstream ops on the driver are merely creating an execution plan >> for each RDD. >> >> With "feared RDD operations on the driver" I meant to contrast an rdd >> action like rdd.collect that would pull all rdd data to the driver, with >> dstream.foreachRDD(rdd => rdd.op) for which documentation says 'it runs on >> the driver' yet, all that it looks to be running on the driver is the >> scheduling of 'op' on that rdd, just like it happens for all rdd other >> operations >> (thanks to Sean for the clarification) >> >> So, not to move focus away from the original question: >> >> In Spark Streaming, would it be better to do foreachRDD early in a >> pipeline or instead do as much Dstream transformations before going into >> the foreachRDD call? >> >> Between these two pieces of code, from a performance perspective, what >> would be preferred and why: >> >> - Early foreachRDD: >> >> dstream.foreachRDD(rdd => >> val records = rdd.map(elem => record(elem)) >> targets.foreach(target => records.filter{record => >> isTarget(target,record)}.writeToCassandra(target,table)) >> ) >> >> - As most dstream transformations as possible before foreachRDD: >> >> val recordStream = dstream.map(elem => record(elem)) >> targets.foreach{target => recordStream.filter(record => >> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} >> >> ? >> >> kr, Gerard. >> >> >> >> On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas <gerard.m...@gmail.com> >> wrote: >> >>> Thanks Matt, >>> >>> Unlike the feared RDD operations on the driver, it's my understanding >>> that these Dstream ops on the driver are merely creating an execution plan >>> for each RDD. >>> My question still remains: Is it better to foreachRDD early in the >>> process or do as much Dstream transformations before going into the >>> foreachRDD call? >>> >>> Maybe this will require some empirical testing specific to each >>> implementation? >>> >>> -kr, Gerard. >>> >>> >>> On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell <matt.narr...@gmail.com> >>> wrote: >>> >>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html >>>> >>>> foreachRDD is executed on the driver…. >>>> >>>> mn >>>> >>>> On Oct 20, 2014, at 3:07 AM, Gerard Maas <gerard.m...@gmail.com> wrote: >>>> >>>> Pinging TD -- I'm sure you know :-) >>>> >>>> -kr, Gerard. >>>> >>>> On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas <gerard.m...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> We have been implementing several Spark Streaming jobs that are >>>>> basically processing data and inserting it into Cassandra, sorting it >>>>> among >>>>> different keyspaces. >>>>> >>>>> We've been following the pattern: >>>>> >>>>> dstream.foreachRDD(rdd => >>>>> val records = rdd.map(elem => record(elem)) >>>>> targets.foreach(target => records.filter{record => >>>>> isTarget(target,record)}.writeToCassandra(target,table)) >>>>> ) >>>>> >>>>> I've been wondering whether there would be a performance difference in >>>>> transforming the dstream instead of transforming the RDD within the >>>>> dstream >>>>> with regards to how the transformations get scheduled. >>>>> >>>>> Instead of the RDD-centric computation, I could transform the dstream >>>>> until the last step, where I need an rdd to store. >>>>> For example, the previous transformation could be written as: >>>>> >>>>> val recordStream = dstream.map(elem => record(elem)) >>>>> targets.foreach{target => recordStream.filter(record => >>>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))} >>>>> >>>>> Would be a difference in execution and/or performance? What would be >>>>> the preferred way to do this? >>>>> >>>>> Bonus question: Is there a better (more performant) way to sort the >>>>> data in different "buckets" instead of filtering the data collection times >>>>> the #buckets? >>>>> >>>>> thanks, Gerard. >>>>> >>>>> >>>> >>>> >>> >> >