Perfect! That answers my question. I was under the impression that map and reduceByKey were Scala collection functions, but they weren't. Now it makes sense.
On Tue, Mar 11, 2014 at 10:38 PM, Ewen Cheslack-Postava <m...@ewencp.org>wrote: > Ah, I see. You need to follow those other calls through to their > implementations to see what ultimately happens. For example, the map() > calls are to RDD.map, not one of Scala's built-in map methods for > collections. The implementation looks like this: > > /** > * Return a new RDD by applying a function to all elements of this RDD. > */ > def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) > > So once you get to one of the most primitive operations, like, map(), > you'll see the function actually generates a specific type of RDD > representing the transformation. MappedRDD just stores a reference to the > previous RDD, the function it needs to apply -- it doesn't actually contain > any data. Of course the idea is that it *looks* like the normal map(), > filter(), etc. in Scala, but it doesn't work the same way. > > By calling a bunch of these functions, you end up generating a graph, > specifically a DAG, of RDDs. This graph describes all the steps needed to > perform the operation, but no data. The final action, e.g. count() or > collect(), that triggers computation is called on one of these RDDs. To get > the value out, the Spark runtime/scheduler traverses the DAG starting from > that RDD and triggers evaluation of anything parent RDDs it needs that > aren't computed and cached yet. > > Any future operations build on the same DAG as long as you use the same > RDD objects and, if you used cache() or persist(), can reuse the same data > after it has been computed the first time. > > -Ewen > > David Thomas <dt5434...@gmail.com> > March 11, 2014 at 10:15 PM > I think you misunderstood my question - I should have stated it better. > I'm not saying it should be applied immediately, but I'm trying to > understand how Spark achieves this lazy computation transformations. May be > this is due to my ignorance of how Scala works, but when I see the code, I > see that the function is applied to the elements of RDD when I call > distinct - or is it not applied immediately? How does the returned RDD > 'keep track of the operation'? > > > > Ewen Cheslack-Postava <m...@ewencp.org> > March 11, 2014 at 10:06 PM > You should probably be asking the opposite question: why do you think it > *should* be applied immediately? Since the driver program hasn't requested > any data back (distinct generates a new RDD, it doesn't return any data), > there's no need to actually compute anything yet. > > As the documentation describes, if the call returns an RDD, it's > transforming the data and will just keep track of the operation it > eventually needs to perform. Only methods that return data back to the > driver should trigger any computation. > > (The one known exception is sortByKey, which really should be lazy, but > apparently uses an RDD.count call in its implementation: > https://spark-project.atlassian.net/browse/SPARK-1021). > > David Thomas <dt5434...@gmail.com> > March 11, 2014 at 9:49 PM > For example, is distinct() transformation lazy? > > when I see the Spark source code, distinct applies a map-> reduceByKey -> > map function to the RDD elements. Why is this lazy? Won't the function be > applied immediately to the elements of RDD when I call someRDD.distinct? > > /** > * Return a new RDD containing the distinct elements in this RDD. > */ > def distinct(numPartitions: Int): RDD[T] = > map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) > > /** > * Return a new RDD containing the distinct elements in this RDD. > */ > def distinct(): RDD[T] = distinct(partitions.size) > >
<<inline: compose-unknown-contact.jpg>>