Cheers,
~N
From: Michael Armbrust <[email protected]
<mailto:[email protected]>>
Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian <[email protected] <mailto:[email protected]>>
Cc: Nathan <[email protected]
<mailto:[email protected]>>, "[email protected]
<mailto:[email protected]>" <[email protected]
<mailto:[email protected]>>
Subject: Re: SparkSQL schemaRDD & MapPartitions calls - performance
issues - columnar formats?
The other thing to note here is that Spark SQL defensively copies
rows when we switch into user code. This probably explains the
difference between 1 & 2.
The difference between 1 & 3 is likely the cost of decompressing the
column buffers vs. accessing a bunch of uncompressed primitive objects.
On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian <[email protected]
<mailto:[email protected]>> wrote:
Hey Nathan,
Thanks for sharing, this is a very interesting post :) My
comments are inlined below.
Cheng
On 1/7/15 11:53 AM, Nathan McCarthy wrote:
Hi,
I’m trying to use a combination of SparkSQL and ‘normal'
Spark/Scala via rdd.mapPartitions(…). Using the latest release
1.2.0.
Simple example; load up some sample data from parquet on HDFS
(about 380m rows, 10 columns) on a 7 node cluster.
val t = sqlC.parquetFile("/user/n/sales-tran12m.parquet”)
t.registerTempTable("test1”)
sqlC.cacheTable("test1”)
Now lets do some operations on it; I want the total sales &
quantities sold for each hour in the day so I choose 3 out of
the 10 possible columns...
sqlC.sql("select Hour, sum(ItemQty), sum(Sales) from test1 group
by Hour").collect().foreach(println)
After the table has been 100% cached in memory, this takes
around 11 seconds.
Lets do the same thing but via a MapPartitions call (this isn’t
production ready code but gets the job done).
val try2 = sqlC.sql("select Hour, ItemQty, Sales from test1”)
rddPC.mapPartitions { case hrs =>
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)
for(r <- hrs) {
val hr = r.getInt(0)
qtySum(hr) += r.getDouble(1)
salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) => (a._1 + b._1, a._2 +
b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is
the for-loop. According to my early benchmark done with Scala
2.9, for-loop can be orders of magnitude slower than a simple
while-loop, especially when the body of the loop only does
something as trivial as this case. The reason is that Scala
for-loop is translated into corresponding
foreach/map/flatMap/withFilter function calls. And that's exactly
why Spark SQL tries to avoid for-loop or any other functional
style code in critical paths (where every row is touched), we
also uses reusable mutable row objects instead of the immutable
version to improve performance. You may check HiveTableScan,
ParquetTableScan, InMemoryColumnarTableScan etc. for reference.
Also, the `sum` function calls in your SQL code are translated
into `o.a.s.s.execution.Aggregate` operators, which also use
imperative while-loop and reusable mutable rows.
Another thing to notice is that the `hrs` iterator physically
points to underlying in-memory columnar byte buffers, and the
`for (r <- hrs) { ... }` loop actually decompresses and extracts
values from required byte buffers (this is the "unwrapping"
processes you mentioned below).
Now this takes around ~49 seconds… Even though test1 table is
100% cached. The number of partitions remains the same…
Now if I create a simple RDD of a case class HourSum(hour: Int,
qty: Double, sales: Double)
Convert the SchemaRDD;
val rdd = sqlC.sql("select * from test1").map{ r =>
HourSum(r.getInt(1), r.getDouble(7), r.getDouble(8)) }.cache()
//cache all the data
rdd.count()
Then run basically the same MapPartitions query;
rdd.mapPartitions { case hrs =>
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)
for(r <- hrs) {
val hr = r.hour
qtySum(hr) += r.qty
salesSum(hr) += r.sales
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
}.reduceByKey((a,b) => (a._1 + b._1, a._2 +
b._2)).collect().foreach(println)
This takes around 1.5 seconds! Albeit the memory footprint is
much larger.
I guess this 1.5 seconds doesn't include the time spent on
caching the simple RDD? As I've explained above, in the first
`mapPartitions` style snippet, columnar byte buffer unwrapping
happens within the `mapPartitions` call. However, in this
version, the unwrapping process happens when the `rdd.count()`
action is performed. At that point, all values of all columns are
extracted from underlying byte buffers, and the portion of data
you need are then manually selected and transformed into the
simple case class RDD via the `map` call.
If you include time spent on caching the simple case class RDD,
it should be even slower than the first `mapPartitions` version.
My thinking is that because SparkSQL does store things in a
columnar format, there is some unwrapping to be done out of the
column array buffers which takes time and for some reason this
just takes longer when I switch out to map partitions (maybe its
unwrapping the entire row, even though I’m using just a subset
of columns, or maybe there is some object creation/autoboxing
going on when calling getInt or getDouble)…
I’ve tried simpler cases too, like just summing sales. Running
sum via SQL is fast (4.7 seconds), running a mapPartition sum on
a double RDD is even faster (2.6 seconds). But MapPartitions on
the SchemaRDD;
/sqlC.sql("select SalesInclGST from test1").mapPartitions(iter
=> Iterator(iter.foldLeft(0.0)((t,r) => t+r.getDouble(0)))).sum/
takes a long time (33 seconds). In all these examples
everything is fully cached in memory. And yes for these kinds of
operations I can use SQL, but for more complex queries I’d much
rather be using a combo of SparkSQL to select the data (so I get
nice things like Parquet pushdowns etc.) & functional Scala!
Again, unfortunately, functional style code like `Iterator.sum`
and `Iterator.foldLeft` can be really slow on critical paths.
I think I’m doing something dumb… Is there something I should be
doing to get faster performance on MapPartitions on SchemaRDDs?
Is there some unwrapping going on in the background that
catalyst does in a smart way that I’m missing?
It makes sense that people use both Spark SQL and Spark core,
especially when Spark SQL lacks features users need (like window
function, for now). The suggestion here is, if you really care
about performance (more than code readability and maintenance
cost), then avoid immutable, functional code whenever possible on
any critical paths...
Cheers,
~N
Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000
T: +61 2 8224 8922 <tel:%2B61%202%208224%208922>
F: +61 2 9292 6444 <tel:%2B61%202%209292%206444>
W: quantium.com.au <http://www.quantium.com.au>
------------------------------------------------------------------------
linkedin.com/company/quantium
<http://www.linkedin.com/company/quantium>
facebook.com/QuantiumAustralia
<http://www.facebook.com/QuantiumAustralia>
twitter.com/QuantiumAU <http://www.twitter.com/QuantiumAU>
The contents of this email, including attachments, may be
confidential information. If you are not the intended recipient,
any use, disclosure or copying of the information is
unauthorised. If you have
received this email in error, we would be grateful if you would
notify us immediately by email reply, phone (+ 61 2 9292 6400
<tel:%28%2B%2061%202%209292%206400>) or fax (+ 61 2 9292 6444
<tel:%28%2B%2061%202%209292%206444>) and delete the message from
your system.
Nathan McCarthy
QUANTIUM
Level 25, 8 Chifley, 8-12 Chifley Square
Sydney NSW 2000
T: +61 2 8224 8922
F: +61 2 9292 6444
W: quantium.com.au <www.quantium.com.au>
------------------------------------------------------------------------
linkedin.com/company/quantium <www.linkedin.com/company/quantium>
facebook.com/QuantiumAustralia <www.facebook.com/QuantiumAustralia>
twitter.com/QuantiumAU <www.twitter.com/QuantiumAU>
The contents of this email, including attachments, may be
confidential information. If you are not the intended recipient, any
use, disclosure or copying of the information is unauthorised. If you
have
received this email in error, we would be grateful if you would
notify us immediately by email reply, phone (+ 61 2 9292 6400) or fax
(+ 61 2 9292 6444) and delete the message from your system.