I think you're complicating the cache behavior by aggressively re-using
vars when temporary vals would be more straightforward. For example,
newBase = newBase.unpersist()... effectively means that newBase's data is
not actually cached when the subsequent .union(...) is performed, so it
probably goes back to the lineage... Same with the current.unpersist logic
before it.

Names are cheap, so just use local vals:

val newCurrent = rdd.union(current).reduceByKey(_+_)
current.unpersist()

Also, what happens if you omit the "2" argument for the number of
partitions in reduceByKey?

Other minor points:

I would change the joined, toUpdate, toNotUpdate logic to this:

val joined = current.leftOuterJoin(newBase).map(mymap).cache()

val toUpdate = joined.filter(myfilter).cache()
val toNotUpdate = joined.filter(mynotfilter).cache()


Maybe it's just for this email example, but you don't need to call collect
on toUpdate before using foreach(println). If the RDD is huge, you
definitely don't want to do that.

Hope this helps.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya <anand.na...@gmail.com> wrote:

> Yes, myRDD is outside of DStream. Following is the actual code where newBase
> and current are the rdds being updated with each batch:
>
>   val base = sc.textFile...
>   var newBase = base.cache()
>
>   val dstream: DStream[String] = ssc.textFileStream...
>   var current: RDD[(String, Long)] = sc.emptyRDD.cache()
>
>   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {
>
>     current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)
>
>     val joined = current.leftOuterJoin(newBase).cache()
>     val toUpdate = joined.filter(myfilter).map(mymap).cache()
>     val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()
>
>     toUpdate.collect().foreach(println) // this goes to some store
>
>     newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
> 2).cache()
>
>     current = toNotUpdate.cache()
>
>     toUpdate.unpersist()
>     joined.unpersist()
>     rdd.unpersist()
>   })
>
>
> Regards,
>
> Anand
>
>
> On 9 July 2015 at 18:16, Dean Wampler <deanwamp...@gmail.com> wrote:
>
>> Is myRDD outside a DStream? If so are you persisting on each batch
>> iteration? It should be checkpointed frequently too.
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <anand.na...@gmail.com>
>> wrote:
>>
>>> The data coming from dstream have the same keys that are in myRDD, so
>>> the reduceByKey after union keeps the overall tuple count in myRDD
>>> fixed. Or even with fixed tuple count, it will keep consuming more
>>> resources?
>>>
>>> On 9 July 2015 at 16:19, Tathagata Das <t...@databricks.com> wrote:
>>>
>>>> If you are continuously unioning RDDs, then you are accumulating ever
>>>> increasing data, and you are processing ever increasing amount of data in
>>>> every batch. Obviously this is going to not last for very long. You
>>>> fundamentally cannot keep processing ever increasing amount of data with
>>>> finite resources, isnt it?
>>>>
>>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <anand.na...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>>
>>>>> On 9 July 2015 at 15:35, Michel Hubert <mich...@vsnsystemen.nl> wrote:
>>>>>
>>>>>>  Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I was just wondering how you generated to second image with the
>>>>>> charts.
>>>>>>
>>>>>> What product?
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Anand Nalya [mailto:anand.na...@gmail.com]
>>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>>> *To:* spark users
>>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I've an application in which an rdd is being updated with tuples
>>>>>> coming from RDDs in a DStream with following pattern.
>>>>>>
>>>>>>
>>>>>>
>>>>>> dstream.foreachRDD(rdd => {
>>>>>>
>>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>>
>>>>>> })
>>>>>>
>>>>>>
>>>>>>
>>>>>> I'm using cache() and checkpointin to cache results. Over the time,
>>>>>> the lineage of myRDD keeps increasing and stages in each batch of dstream
>>>>>> keeps increasing, even though all the earlier stages are skipped. When 
>>>>>> the
>>>>>> number of stages grow big enough, the overall delay due to scheduling 
>>>>>> delay
>>>>>> starts increasing. The processing time for each batch is still fixed.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Following figures illustrate the problem:
>>>>>>
>>>>>>
>>>>>>
>>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>>
>>>>>> [image: Image removed by sender.]
>>>>>>
>>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>>
>>>>>> [image: Image removed by sender.]
>>>>>>
>>>>>> Is there some pattern that I can use to avoid this?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Anand
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to