The other thing to note here is that Spark SQL defensively copies rows when we switch into user code. This probably explains the difference between 1 & 2.
The difference between 1 & 3 is likely the cost of decompressing the column buffers vs. accessing a bunch of uncompressed primitive objects. On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian <[email protected]> wrote: > Hey Nathan, > > Thanks for sharing, this is a very interesting post :) My comments are > inlined below. > > Cheng > > On 1/7/15 11:53 AM, Nathan McCarthy wrote: > > Hi, > > I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via > rdd.mapPartitions(…). Using the latest release 1.2.0. > > Simple example; load up some sample data from parquet on HDFS (about > 380m rows, 10 columns) on a 7 node cluster. > > val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”) > t.registerTempTable("test1”) > sqlC.cacheTable("test1”) > > Now lets do some operations on it; I want the total sales & quantities > sold for each hour in the day so I choose 3 out of the 10 possible > columns... > > sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group by > Hour").collect().foreach(println) > > After the table has been 100% cached in memory, this takes around 11 > seconds. > > Lets do the same thing but via a MapPartitions call (this isn’t > production ready code but gets the job done). > > val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”) > rddPC.mapPartitions { case hrs => > val qtySum = new Array[Double](24) > val salesSum = new Array[Double](24) > > for(r <- hrs) { > val hr = r.getInt(0) > qtySum(hr) += r.getDouble(1) > salesSum(hr) += r.getDouble(2) > } > (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator > }.reduceByKey((a,b) => (a._1 + b._1, a._2 + > b._2)).collect().foreach(println) > > I believe the evil thing that makes this snippet much slower is the > for-loop. According to my early benchmark done with Scala 2.9, for-loop can > be orders of magnitude slower than a simple while-loop, especially when the > body of the loop only does something as trivial as this case. The reason is > that Scala for-loop is translated into corresponding > foreach/map/flatMap/withFilter function calls. And that's exactly why Spark > SQL tries to avoid for-loop or any other functional style code in critical > paths (where every row is touched), we also uses reusable mutable row > objects instead of the immutable version to improve performance. You may > check HiveTableScan, ParquetTableScan, InMemoryColumnarTableScan etc. for > reference. Also, the `sum` function calls in your SQL code are translated > into `o.a.s.s.execution.Aggregate` operators, which also use imperative > while-loop and reusable mutable rows. > > Another thing to notice is that the `hrs` iterator physically points to > underlying in-memory columnar byte buffers, and the `for (r <- hrs) { ... > }` loop actually decompresses and extracts values from required byte > buffers (this is the "unwrapping" processes you mentioned below). > > > Now this takes around ~49 seconds… Even though test1 table is 100% > cached. The number of partitions remains the same… > > Now if I create a simple RDD of a case class HourSum(hour: Int, qty: > Double, sales: Double) > > Convert the SchemaRDD; > val rdd = sqlC.sql("select * from test1").map{ r => HourSum(r.getInt(1), > r.getDouble(7), r.getDouble(8)) }.cache() > //cache all the data > rdd.count() > > Then run basically the same MapPartitions query; > > rdd.mapPartitions { case hrs => > val qtySum = new Array[Double](24) > val salesSum = new Array[Double](24) > > for(r <- hrs) { > val hr = r.hour > qtySum(hr) += r.qty > salesSum(hr) += r.sales > } > (salesSum zip qtySum).zipWithIndex.map(_.swap).iterator > }.reduceByKey((a,b) => (a._1 + b._1, a._2 + > b._2)).collect().foreach(println) > > This takes around 1.5 seconds! Albeit the memory footprint is much > larger. > > I guess this 1.5 seconds doesn't include the time spent on caching the > simple RDD? As I've explained above, in the first `mapPartitions` style > snippet, columnar byte buffer unwrapping happens within the `mapPartitions` > call. However, in this version, the unwrapping process happens when the > `rdd.count()` action is performed. At that point, all values of all columns > are extracted from underlying byte buffers, and the portion of data you > need are then manually selected and transformed into the simple case class > RDD via the `map` call. > > If you include time spent on caching the simple case class RDD, it should > be even slower than the first `mapPartitions` version. > > > My thinking is that because SparkSQL does store things in a columnar > format, there is some unwrapping to be done out of the column array buffers > which takes time and for some reason this just takes longer when I switch > out to map partitions (maybe its unwrapping the entire row, even though I’m > using just a subset of columns, or maybe there is some object > creation/autoboxing going on when calling getInt or getDouble)… > > I’ve tried simpler cases too, like just summing sales. Running sum via > SQL is fast (4.7 seconds), running a mapPartition sum on a double RDD is > even faster (2.6 seconds). But MapPartitions on the SchemaRDD; > > *sqlC.sql("select SalesInclGST from test1").mapPartitions(iter => > Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum* > > takes a long time (33 seconds). In all these examples everything is > fully cached in memory. And yes for these kinds of operations I can use > SQL, but for more complex queries I’d much rather be using a combo of > SparkSQL to select the data (so I get nice things like Parquet pushdowns > etc.) & functional Scala! > > Again, unfortunately, functional style code like `Iterator.sum` and > `Iterator.foldLeft` can be really slow on critical paths. > > > I think I’m doing something dumb… Is there something I should be doing > to get faster performance on MapPartitions on SchemaRDDs? Is there some > unwrapping going on in the background that catalyst does in a smart way > that I’m missing? > > It makes sense that people use both Spark SQL and Spark core, especially > when Spark SQL lacks features users need (like window function, for now). > The suggestion here is, if you really care about performance (more than > code readability and maintenance cost), then avoid immutable, functional > code whenever possible on any critical paths... > > > Cheers, > ~N > > Nathan McCarthy > QUANTIUM > Level 25, 8 Chifley, 8-12 Chifley Square > Sydney NSW 2000 > > T: +61 2 8224 8922 > F: +61 2 9292 6444 > > W: quantium.com.au <http://www.quantium.com.au> > > ------------------------------ > > linkedin.com/company/quantium <http://www.linkedin.com/company/quantium> > > facebook.com/QuantiumAustralia <http://www.facebook.com/QuantiumAustralia> > > twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU> > > The contents of this email, including attachments, may be confidential > information. If you are not the intended recipient, any use, disclosure or > copying of the information is unauthorised. If you have > received this email in error, we would be grateful if you would notify us > immediately by email reply, phone (+ 61 2 9292 6400) or fax (+ 61 2 9292 > 6444) and delete the message from your system. > > >
