Thanks for the help Dean/TD, I was able to cut the lineage with checkpointing with following code:
dstream.countByValue().foreachRDD((rdd, time) => { val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base) val toUpdate = joined.filter(myfilter).map(mymap) val toNotUpdate = joined.filter(mynotfilter).map(mymap) base = base.union(toUpdate).reduceByKey(_+_, 2) current = toNotUpdate if(time.isMultipleOf(duration)){ base.checkpoint() current.checkpoint() } println(toUpdate.count()) // to persistence }) Thanks, Anand On 10 July 2015 at 02:16, Tathagata Das <t...@databricks.com> wrote: > Summarizing the main problems discussed by Dean > > 1. If you have an infinitely growing lineage, bad things will eventually > happen. You HAVE TO periodically (say every 10th batch), checkpoint the > information. > > 2. Unpersist the previous `current` RDD ONLY AFTER running an action on > the `newCurrent`. Otherwise you are throwing current out of the cache > before newCurrent has been computed. Modifying Dean's example. > > val newCurrent = rdd.union(current).reduceByKey(_+_) > ... > // join with newCurrent > // collect or count or any action that uses newCurrent > // > > // Now you can unpersist because the newCurrent has been persisted and > wont require falling back to this cached current RDD. > current.unpersist() > > > On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler <deanwamp...@gmail.com> > wrote: > >> I think you're complicating the cache behavior by aggressively re-using >> vars when temporary vals would be more straightforward. For example, >> newBase = newBase.unpersist()... effectively means that newBase's data is >> not actually cached when the subsequent .union(...) is performed, so it >> probably goes back to the lineage... Same with the current.unpersist logic >> before it. >> >> Names are cheap, so just use local vals: >> >> val newCurrent = rdd.union(current).reduceByKey(_+_) >> current.unpersist() >> >> Also, what happens if you omit the "2" argument for the number of >> partitions in reduceByKey? >> >> Other minor points: >> >> I would change the joined, toUpdate, toNotUpdate logic to this: >> >> val joined = current.leftOuterJoin(newBase).map(mymap).cache() >> >> val toUpdate = joined.filter(myfilter).cache() >> val toNotUpdate = joined.filter(mynotfilter).cache() >> >> >> Maybe it's just for this email example, but you don't need to call >> collect on toUpdate before using foreach(println). If the RDD is huge, you >> definitely don't want to do that. >> >> Hope this helps. >> >> dean >> >> Dean Wampler, Ph.D. >> Author: Programming Scala, 2nd Edition >> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >> Typesafe <http://typesafe.com> >> @deanwampler <http://twitter.com/deanwampler> >> http://polyglotprogramming.com >> >> On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya <anand.na...@gmail.com> >> wrote: >> >>> Yes, myRDD is outside of DStream. Following is the actual code where newBase >>> and current are the rdds being updated with each batch: >>> >>> val base = sc.textFile... >>> var newBase = base.cache() >>> >>> val dstream: DStream[String] = ssc.textFileStream... >>> var current: RDD[(String, Long)] = sc.emptyRDD.cache() >>> >>> dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => { >>> >>> current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) >>> >>> val joined = current.leftOuterJoin(newBase).cache() >>> val toUpdate = joined.filter(myfilter).map(mymap).cache() >>> val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() >>> >>> toUpdate.collect().foreach(println) // this goes to some store >>> >>> newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, >>> 2).cache() >>> >>> current = toNotUpdate.cache() >>> >>> toUpdate.unpersist() >>> joined.unpersist() >>> rdd.unpersist() >>> }) >>> >>> >>> Regards, >>> >>> Anand >>> >>> >>> On 9 July 2015 at 18:16, Dean Wampler <deanwamp...@gmail.com> wrote: >>> >>>> Is myRDD outside a DStream? If so are you persisting on each batch >>>> iteration? It should be checkpointed frequently too. >>>> >>>> Dean Wampler, Ph.D. >>>> Author: Programming Scala, 2nd Edition >>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>>> Typesafe <http://typesafe.com> >>>> @deanwampler <http://twitter.com/deanwampler> >>>> http://polyglotprogramming.com >>>> >>>> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <anand.na...@gmail.com> >>>> wrote: >>>> >>>>> The data coming from dstream have the same keys that are in myRDD, so >>>>> the reduceByKey after union keeps the overall tuple count in myRDD >>>>> fixed. Or even with fixed tuple count, it will keep consuming more >>>>> resources? >>>>> >>>>> On 9 July 2015 at 16:19, Tathagata Das <t...@databricks.com> wrote: >>>>> >>>>>> If you are continuously unioning RDDs, then you are accumulating ever >>>>>> increasing data, and you are processing ever increasing amount of data in >>>>>> every batch. Obviously this is going to not last for very long. You >>>>>> fundamentally cannot keep processing ever increasing amount of data with >>>>>> finite resources, isnt it? >>>>>> >>>>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <anand.na...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thats from the Streaming tab for Spark 1.4 WebUI. >>>>>>> >>>>>>> On 9 July 2015 at 15:35, Michel Hubert <mich...@vsnsystemen.nl> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I was just wondering how you generated to second image with the >>>>>>>> charts. >>>>>>>> >>>>>>>> What product? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *From:* Anand Nalya [mailto:anand.na...@gmail.com] >>>>>>>> *Sent:* donderdag 9 juli 2015 11:48 >>>>>>>> *To:* spark users >>>>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I've an application in which an rdd is being updated with tuples >>>>>>>> coming from RDDs in a DStream with following pattern. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> dstream.foreachRDD(rdd => { >>>>>>>> >>>>>>>> myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) >>>>>>>> >>>>>>>> }) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I'm using cache() and checkpointin to cache results. Over the time, >>>>>>>> the lineage of myRDD keeps increasing and stages in each batch of >>>>>>>> dstream >>>>>>>> keeps increasing, even though all the earlier stages are skipped. When >>>>>>>> the >>>>>>>> number of stages grow big enough, the overall delay due to scheduling >>>>>>>> delay >>>>>>>> starts increasing. The processing time for each batch is still fixed. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Following figures illustrate the problem: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1 >>>>>>>> >>>>>>>> [image: Image removed by sender.] >>>>>>>> >>>>>>>> Delays: https://i.imgur.com/1DZHydw.png?1 >>>>>>>> >>>>>>>> [image: Image removed by sender.] >>>>>>>> >>>>>>>> Is there some pattern that I can use to avoid this? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Anand >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >