Thanks for the experiments and analysis! I think Michael already submitted a patch that avoids scanning all columns for count(*) or count(1).
On Mon, May 12, 2014 at 9:46 PM, Andrew Ash <and...@andrewash.com> wrote: > Hi Spark devs, > > First of all, huge congrats on the parquet integration with SparkSQL! This > is an incredible direction forward and something I can see being very > broadly useful. > > I was doing some preliminary tests to see how it works with one of my > workflows, and wanted to share some numbers that people might want to know > about. > > I also wanted to point out that .count() doesn't seem integrated with the > rest of the optimization framework, and some big gains could be possible. > > > So, the numbers: > > I took a table extracted from a SQL database and stored in HDFS: > > - 115 columns (several always-empty, mostly strings, some enums, some > numbers) > - 253,887,080 rows > - 182,150,295,881 bytes (raw uncompressed) > - 42,826,820,222 bytes (lzo compressed with .index file) > > And I converted it to Parquet using SparkSQL's SchemaRDD.saveAsParquet() > call: > > - Converting from .lzo in HDFS to .parquet in HDFS took 635s using 42 > cores across 4 machines > - 17,517,922,117 bytes (parquet per SparkSQL defaults) > > So storing in parquet format vs lzo compresses the data down to less than > 50% of the .lzo size, and under 10% of the raw uncompressed size. Nice! > > > I then did some basic interactions on it: > > *Row count* > > - LZO > - lzoFile("/path/to/lzo").count > - 31.632305953s > - Parquet > - sqlContext.parquetFile("/path/to/parquet").count > - 289.129487003s > > Reassembling rows from the separate column storage is clearly really > expensive. Median task length is 33s vs 4s, and of that 33s in each task > (319 tasks total) about 1.75 seconds are spent in GC (inefficient object > allocation?) > > > > *Count number of rows with a particular key:* > > - LZO > - lzoFile("/path/to/lzo").filter(_.split("\\|")(0) == > "1234567890").count > - 73.988897511s > - Parquet > - sqlContext.parquetFile("/path/to/parquet").where('COL === > 1234567890).count > - 293.410470418s > - Parquet (hand-tuned to count on just one column) > - sqlContext.parquetFile("/path/to/parquet").where('COL === > 1234567890).select('IDCOL).count > - 1.160449187s > > It looks like currently the .count() on parquet is handled incredibly > inefficiently and all the columns are materialized. But if I select just > that relevant column and then count, then the column-oriented storage of > Parquet really shines. > > There ought to be a potential optimization here such that a .count() on a > SchemaRDD backed by Parquet doesn't require re-assembling the rows, as > that's expensive. I don't think .count() is handled specially in > SchemaRDDs, but it seems ripe for optimization. > > > *Count number of distinct values in a column* > > - LZO > - lzoFile("/path/to/lzo").map(sel(0)).distinct.count > - 115.582916866s > - Parquet > - sqlContext.parquetFile("/path/to/parquet").select('COL).distinct.count > - 16.839004826 s > > It turns out column selectivity is very useful! I'm guessing that if I > could get byte counts read out of HDFS, that would just about match up with > the difference in read times. > > > > > Any thoughts on how to embed the knowledge of my hand-tuned additional > .select('IDCOL) > into Catalyst? > > > Thanks again for all the hard work and prep for the 1.0 release! > > Andrew >