Thanks Patrick and Matei for the clarification. I actually have to update some code now, as I was apparently relying on the fact that the output files are being re-used. Explains some edge-case behavior that I've seen.
For me, at least, I read the guide, did some tests on fairly extensive RDD dependency graphs, saw that tasks earlier in the dependency graphs were not being regenerated and assumed (very much incorrectly I just found out!) that it was because the RDDs themselves were being cached. I wonder if there is a way to explain this distinction concisely in the programming guide. Or maybe I'm the only one that went down this incorrect learning path :-) Ethan On Sun, May 4, 2014 at 12:05 AM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > Yes, this happens as long as you use the same RDD. For example say you do > the following: > > data1 = sc.textFile(…).map(…).reduceByKey(…) > data1.count() > data1.filter(…).count() > > The first count() causes outputs of the map/reduce pair in there to be > written out to shuffle files. Next time you do a count, on either this RDD > or a child (e.g. after the filter), we notice that output files were > already generated for this shuffle so we don’t rerun the map stage. Note > that the output does get read again over the network, which is kind of > wasteful (if you really wanted to reuse this as quickly as possible you’d > use cache()). > > Matei > > On May 3, 2014, at 8:44 PM, Koert Kuipers <ko...@tresata.com> wrote: > > Hey Matei, > Not sure i understand that. These are 2 separate jobs. So the second job > takes advantage of the fact that there is map output left somewhere on disk > from the first job, and re-uses that? > > > On Sat, May 3, 2014 at 8:29 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > >> Hi Diana, >> >> Apart from these reasons, in a multi-stage job, Spark saves the map >> output files from map stages to the filesystem, so it only needs to rerun >> the last reduce stage. This is why you only saw one stage executing. These >> files are saved for fault recovery but they speed up subsequent runs. >> >> Matei >> >> On May 3, 2014, at 5:21 PM, Patrick Wendell <pwend...@gmail.com> wrote: >> >> Ethan, >> >> What you said is actually not true, Spark won't cache RDD's unless you >> ask it to. >> >> The observation here - that running the same job can speed up >> substantially even without caching - is common. This is because other >> components in the stack are performing caching and optimizations. Two that >> can make a huge difference are: >> >> 1. The OS buffer cache. Which will keep recently read disk blocks in >> memory. >> 2. The Java just-in-time compiler (JIT) which will use runtime profiling >> to significantly speed up execution speed. >> >> These can make a huge difference if you are running the same job >> over-and-over. And there are other things like the OS network stack >> increasing TCP windows and so fourth. These will all improve response time >> as a spark program executes. >> >> >> On Fri, May 2, 2014 at 9:27 AM, Ethan Jewett <esjew...@gmail.com> wrote: >> >>> I believe Spark caches RDDs it has memory for regardless of whether you >>> actually call the 'cache' method on the RDD. The 'cache' method just tips >>> off Spark that the RDD should have higher priority. At least, that is my >>> experience and it seems to correspond with your experience and with my >>> recollection of other discussions on this topic on the list. However, going >>> back and looking at the programming guide, this is not the way the >>> cache/persist behavior is described. Does the guide need to be updated? >>> >>> >>> On Fri, May 2, 2014 at 9:04 AM, Diana Carroll <dcarr...@cloudera.com>wrote: >>> >>>> I'm just Posty McPostalot this week, sorry folks! :-) >>>> >>>> Anyway, another question today: >>>> I have a bit of code that is pretty time consuming (pasted at the end >>>> of the message): >>>> It reads in a bunch of XML files, parses them, extracts some data in a >>>> map, counts (using reduce), and then sorts. All stages are executed when >>>> I do a final operation (take). The first stage is the most expensive: on >>>> first run it takes 30s to a minute. >>>> >>>> I'm not caching anything. >>>> >>>> When I re-execute that take at the end, I expected it to re-execute all >>>> the same stages, and take approximately the same amount of time, but it >>>> didn't. The second "take" executes only a single stage which collectively >>>> run very fast: the whole operation takes less than 1 second (down from 5 >>>> minutes!) >>>> >>>> While this is awesome (!) I don't understand it. If I'm not caching >>>> data, why would I see such a marked performance improvement on subsequent >>>> execution? >>>> >>>> (or is this related to the known .9.1 bug about sortByKey executing an >>>> action when it shouldn't?) >>>> >>>> Thanks, >>>> Diana >>>> <sparkdev_04-23_KEEP_FOR_BUILDS.png> >>>> >>>> # load XML files containing device activation records. >>>> # Find the most common device models activated >>>> import xml.etree.ElementTree as ElementTree >>>> >>>> # Given a partition containing multi-line XML, parse the contents. >>>> # Return an iterator of activation Elements contained in the partition >>>> def getactivations(fileiterator): >>>> s = '' >>>> for i in fileiterator: s = s + str(i) >>>> filetree = ElementTree.fromstring(s) >>>> return filetree.getiterator('activation') >>>> >>>> # Get the model name from a device activation record >>>> def getmodel(activation): >>>> return activation.find('model').text >>>> >>>> filename="hdfs://localhost/user/training/activations/*.xml" >>>> >>>> # parse each partition as a file into an activation XML record >>>> activations = sc.textFile(filename) >>>> activationTrees = activations.mapPartitions(lambda xml: >>>> getactivations(xml)) >>>> models = activationTrees.map(lambda activation: getmodel(activation)) >>>> >>>> # count and sort activations by model >>>> topmodels = models.map(lambda model: (model,1))\ >>>> .reduceByKey(lambda v1,v2: v1+v2)\ >>>> .map(lambda (model,count): (count,model))\ >>>> .sortByKey(ascending=False) >>>> >>>> # display the top 10 models >>>> for (count,model) in topmodels.take(10): >>>> print "Model %s (%s)" % (model,count) >>>> >>>> # repeat! >>>> for (count,model) in topmodels.take(10): >>>> print "Model %s (%s)" % (model,count) >>>> >>>> >>> >> >> > >