Thanks for the help Dean/TD,

I was able to cut the lineage with checkpointing with following code:

dstream.countByValue().foreachRDD((rdd, time) => {
    val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base)
    val toUpdate = joined.filter(myfilter).map(mymap)
    val toNotUpdate = joined.filter(mynotfilter).map(mymap)

    base = base.union(toUpdate).reduceByKey(_+_, 2)
    current = toNotUpdate

    if(time.isMultipleOf(duration)){
      base.checkpoint()
      current.checkpoint()
    }
    println(toUpdate.count()) // to persistence
  })

Thanks,
Anand

On 10 July 2015 at 02:16, Tathagata Das <t...@databricks.com> wrote:

> Summarizing the main problems discussed by Dean
>
> 1. If you have an infinitely growing lineage, bad things will eventually
> happen. You HAVE TO periodically (say every 10th batch), checkpoint the
> information.
>
> 2. Unpersist the previous `current` RDD ONLY AFTER running an action on
> the `newCurrent`. Otherwise you are throwing current out of the cache
> before newCurrent has been computed. Modifying Dean's example.
>
> val newCurrent = rdd.union(current).reduceByKey(_+_)
> ...
> // join with newCurrent
> // collect or count or any action that uses newCurrent
> //
>
> // Now you can unpersist because the newCurrent has been persisted and
> wont require falling back to this cached current RDD.
> current.unpersist()
>
>
> On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler <deanwamp...@gmail.com>
> wrote:
>
>> I think you're complicating the cache behavior by aggressively re-using
>> vars when temporary vals would be more straightforward. For example,
>> newBase = newBase.unpersist()... effectively means that newBase's data is
>> not actually cached when the subsequent .union(...) is performed, so it
>> probably goes back to the lineage... Same with the current.unpersist logic
>> before it.
>>
>> Names are cheap, so just use local vals:
>>
>> val newCurrent = rdd.union(current).reduceByKey(_+_)
>> current.unpersist()
>>
>> Also, what happens if you omit the "2" argument for the number of
>> partitions in reduceByKey?
>>
>> Other minor points:
>>
>> I would change the joined, toUpdate, toNotUpdate logic to this:
>>
>> val joined = current.leftOuterJoin(newBase).map(mymap).cache()
>>
>> val toUpdate = joined.filter(myfilter).cache()
>> val toNotUpdate = joined.filter(mynotfilter).cache()
>>
>>
>> Maybe it's just for this email example, but you don't need to call
>> collect on toUpdate before using foreach(println). If the RDD is huge, you
>> definitely don't want to do that.
>>
>> Hope this helps.
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya <anand.na...@gmail.com>
>> wrote:
>>
>>> Yes, myRDD is outside of DStream. Following is the actual code where newBase
>>> and current are the rdds being updated with each batch:
>>>
>>>   val base = sc.textFile...
>>>   var newBase = base.cache()
>>>
>>>   val dstream: DStream[String] = ssc.textFileStream...
>>>   var current: RDD[(String, Long)] = sc.emptyRDD.cache()
>>>
>>>   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {
>>>
>>>     current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)
>>>
>>>     val joined = current.leftOuterJoin(newBase).cache()
>>>     val toUpdate = joined.filter(myfilter).map(mymap).cache()
>>>     val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()
>>>
>>>     toUpdate.collect().foreach(println) // this goes to some store
>>>
>>>     newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
>>> 2).cache()
>>>
>>>     current = toNotUpdate.cache()
>>>
>>>     toUpdate.unpersist()
>>>     joined.unpersist()
>>>     rdd.unpersist()
>>>   })
>>>
>>>
>>> Regards,
>>>
>>> Anand
>>>
>>>
>>> On 9 July 2015 at 18:16, Dean Wampler <deanwamp...@gmail.com> wrote:
>>>
>>>> Is myRDD outside a DStream? If so are you persisting on each batch
>>>> iteration? It should be checkpointed frequently too.
>>>>
>>>> Dean Wampler, Ph.D.
>>>> Author: Programming Scala, 2nd Edition
>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>> Typesafe <http://typesafe.com>
>>>> @deanwampler <http://twitter.com/deanwampler>
>>>> http://polyglotprogramming.com
>>>>
>>>> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <anand.na...@gmail.com>
>>>> wrote:
>>>>
>>>>> The data coming from dstream have the same keys that are in myRDD, so
>>>>> the reduceByKey after union keeps the overall tuple count in myRDD
>>>>> fixed. Or even with fixed tuple count, it will keep consuming more
>>>>> resources?
>>>>>
>>>>> On 9 July 2015 at 16:19, Tathagata Das <t...@databricks.com> wrote:
>>>>>
>>>>>> If you are continuously unioning RDDs, then you are accumulating ever
>>>>>> increasing data, and you are processing ever increasing amount of data in
>>>>>> every batch. Obviously this is going to not last for very long. You
>>>>>> fundamentally cannot keep processing ever increasing amount of data with
>>>>>> finite resources, isnt it?
>>>>>>
>>>>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <anand.na...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>>>>
>>>>>>> On 9 July 2015 at 15:35, Michel Hubert <mich...@vsnsystemen.nl>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>  Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I was just wondering how you generated to second image with the
>>>>>>>> charts.
>>>>>>>>
>>>>>>>> What product?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From:* Anand Nalya [mailto:anand.na...@gmail.com]
>>>>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>>>>> *To:* spark users
>>>>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I've an application in which an rdd is being updated with tuples
>>>>>>>> coming from RDDs in a DStream with following pattern.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> dstream.foreachRDD(rdd => {
>>>>>>>>
>>>>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>>>>
>>>>>>>> })
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I'm using cache() and checkpointin to cache results. Over the time,
>>>>>>>> the lineage of myRDD keeps increasing and stages in each batch of 
>>>>>>>> dstream
>>>>>>>> keeps increasing, even though all the earlier stages are skipped. When 
>>>>>>>> the
>>>>>>>> number of stages grow big enough, the overall delay due to scheduling 
>>>>>>>> delay
>>>>>>>> starts increasing. The processing time for each batch is still fixed.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Following figures illustrate the problem:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>>>>
>>>>>>>> [image: Image removed by sender.]
>>>>>>>>
>>>>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>>>>
>>>>>>>> [image: Image removed by sender.]
>>>>>>>>
>>>>>>>> Is there some pattern that I can use to avoid this?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Anand
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to