Hi TD,

Thanks a lot for the comprehensive answer.

I think this explanation deserves some place in the Spark Streaming tuning
guide.

-kr, Gerard.

On Thu, Oct 23, 2014 at 11:41 PM, Tathagata Das <tathagata.das1...@gmail.com
> wrote:

> Hey Gerard,
>
> This is a very good question!
>
> *TL;DR: *The performance should be same, except in case of shuffle-based
> operations where the number of reducers is not explicitly specified.
>
> Let me answer in more detail by dividing the set of DStream operations
> into three categories.
>
> *1. Map-like operations (map, flatmap, filter, etc.) that does not involve
> any shuffling of data:* Performance should virtually be the same in both
> cases. Either ways, in each batch, the operations on the batch's RDD are
> first set on the driver, and then the actions like on the RDD are executed.
> There are very very minor differences in the two cases of early foreachRDD
> and late foreachRDD (e.x, cleaning up for function closures, etc.) but
> those should make almost not difference in the performance.
>
> *2. Operations involving shuffle: *Here is there is a subtle difference
> in both cases if the number of partitions is not specified. The default
> number of partitions used when using dstream.reduceByKey() and than when
> using dstream.foreachRDD(_.reduceByKey()) are different, and one needs to
> play around with the number of reducers to see what performs better. But if
> the number of reducers is explicitly specified and is the same both cases,
> then the performance should be similar. Note that this difference in the
> default numbers are not guaranteed to be like this, it could change in
> future implementations.
>
> *3. Aggregation-like operations (count, reduce): *Here there is another
> subtle execution difference between
> - dstream.count() which produces a DStream of single-element RDDs, the
> element being the count, and
> - dstream.foreachRDD(_.count()) which returns the count directly.
>
> In the first case, some random worker node is chosen for the reduce, in
> another the driver is chosen for the reduce. There should not be a
> significant performance difference.
>
> *4. Other operations* including window ops and stateful ops
> (updateStateByKey), are obviously not part of the discussion as they cannot
> be (easily) done through early foreachRDD.
>
> Hope this helps!
>
> TD
>
> PS: Sorry for not noticing this question earlier.
>
> On Wed, Oct 22, 2014 at 5:37 AM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> PS: Just to clarify my statement:
>>
>> >>Unlike the feared RDD operations on the driver, it's my understanding
>> that these Dstream ops on the driver are merely creating an execution plan
>> for each RDD.
>>
>> With "feared RDD operations on the driver" I meant to contrast an rdd
>> action like rdd.collect that would pull all rdd data to the driver, with
>> dstream.foreachRDD(rdd => rdd.op) for which documentation says 'it runs on
>> the driver' yet, all that it looks to be running on the driver is the
>> scheduling of 'op' on that rdd, just like it happens for all rdd other
>> operations
>> (thanks to Sean for the clarification)
>>
>> So, not to move focus away from the original question:
>>
>> In Spark Streaming, would it be better to do foreachRDD early in a
>> pipeline or instead do as much Dstream transformations before going into
>> the foreachRDD call?
>>
>> Between these two pieces of code, from a performance perspective, what
>> would be preferred and why:
>>
>> - Early foreachRDD:
>>
>> dstream.foreachRDD(rdd =>
>>     val records = rdd.map(elem => record(elem))
>>     targets.foreach(target => records.filter{record =>
>> isTarget(target,record)}.writeToCassandra(target,table))
>> )
>>
>> - As most dstream transformations as possible before foreachRDD:
>>
>> val recordStream = dstream.map(elem => record(elem))
>> targets.foreach{target => recordStream.filter(record =>
>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))}
>>
>> ?
>>
>> kr, Gerard.
>>
>>
>>
>> On Wed, Oct 22, 2014 at 2:12 PM, Gerard Maas <gerard.m...@gmail.com>
>> wrote:
>>
>>> Thanks Matt,
>>>
>>> Unlike the feared RDD operations on the driver, it's my understanding
>>> that these Dstream ops on the driver are merely creating an execution plan
>>> for each RDD.
>>> My question still remains: Is it better to foreachRDD early in the
>>> process or do as much Dstream transformations before going into the
>>> foreachRDD call?
>>>
>>> Maybe this will require some empirical testing specific to each
>>> implementation?
>>>
>>> -kr, Gerard.
>>>
>>>
>>> On Mon, Oct 20, 2014 at 5:07 PM, Matt Narrell <matt.narr...@gmail.com>
>>> wrote:
>>>
>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html
>>>>
>>>> foreachRDD is executed on the driver….
>>>>
>>>> mn
>>>>
>>>> On Oct 20, 2014, at 3:07 AM, Gerard Maas <gerard.m...@gmail.com> wrote:
>>>>
>>>> Pinging TD  -- I'm sure you know :-)
>>>>
>>>> -kr, Gerard.
>>>>
>>>> On Fri, Oct 17, 2014 at 11:20 PM, Gerard Maas <gerard.m...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have been implementing several Spark Streaming jobs that are
>>>>> basically processing data and inserting it into Cassandra, sorting it 
>>>>> among
>>>>> different keyspaces.
>>>>>
>>>>> We've been following the pattern:
>>>>>
>>>>> dstream.foreachRDD(rdd =>
>>>>>     val records = rdd.map(elem => record(elem))
>>>>>     targets.foreach(target => records.filter{record =>
>>>>> isTarget(target,record)}.writeToCassandra(target,table))
>>>>> )
>>>>>
>>>>> I've been wondering whether there would be a performance difference in
>>>>> transforming the dstream instead of transforming the RDD within the 
>>>>> dstream
>>>>> with regards to how the transformations get scheduled.
>>>>>
>>>>> Instead of the RDD-centric computation, I could transform the dstream
>>>>> until the last step, where I need an rdd to store.
>>>>> For example, the  previous  transformation could be written as:
>>>>>
>>>>> val recordStream = dstream.map(elem => record(elem))
>>>>> targets.foreach{target => recordStream.filter(record =>
>>>>> isTarget(target,record)).foreachRDD(_.writeToCassandra(target,table))}
>>>>>
>>>>> Would  be a difference in execution and/or performance?  What would be
>>>>> the preferred way to do this?
>>>>>
>>>>> Bonus question: Is there a better (more performant) way to sort the
>>>>> data in different "buckets" instead of filtering the data collection times
>>>>> the #buckets?
>>>>>
>>>>> thanks,  Gerard.
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to