On Wed, May 28, 2014 at 12:08 AM, Ankur Dave <ankurd...@gmail.com> wrote:
> I think what's desired here is for input to be unpersisted automatically > as soon as result is materialized. I don't think there's currently a way > to do this, but the usual workaround is to force result to be > materialized immediately and then unpersist input: > > input.cache()val count = input.countval result = > input.filter(...) > result.cache().foreach(x => {}) // materialize resultinput.unpersist() // > safe because `result` is materialized // and is the only RDD > that depends on `input`return result > > Thanks! Right, this would work. But maybe we don't want to cache the result. We often work with RDDs, that are too big to cache, but as we process them partition by partition, the whole RDD never needs to exist in memory. You probably know what I mean, but here's a small example: 1 2 3 4 5 6 7 8 9 def blowUp(input: RDD[Int]): RDD[Int] = { input.cache() println(input.count) val result = input.flatMap(i => 0 to i) // `result` is too big to cache :(. return result } println(blowUp(sc.parallelize(0 to 10000, 1000)).count) We were discussing these difficulties, and Andras had an interesting idea. What if you could have a limited-use cache. You could say "input.cacheFor(2)" and it would be cached for 2 actions, and dropped after that. In the above example the first action would be "count" on line 3 , and the second would be "count" on line 9. So blowUp() would guarantee that the return value would be good for 1 action. (In the sense, that "input" would not be recalculated.) If the caller wants to use it more than once, they can cache it as usual.