We are working with use cases where we need to do batch processing on a large
number (hundreds of thousands) of Parquet files. The processing is quite
similar per file. There are a many aggregates that are very SQL-friendly
(computing averages, maxima, minima, aggregations on single columns with
some selection criteria). There are also some processing that is more
advanced time-series processing (continuous wavelet transforms and the
like). This all seems like a good use case for Spark.
But I'm having performance problems. Let's take a look at something very
simple, which simply checks whether the parquet files are readable.
Code that seems natural but doesn't work:
import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.map(x => (x,
Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1)
My understanding is that this doesn't work because sqlContext can't be used
inside of a transformation like "map" (or inside an action). That it only
makes sense in the driver. Thus, it becomes a null reference in the above
code, so all reads fail.
Code that works:
import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val successes = parquetFiles.collect().map(x =>
(x, Try(sqlContext.read.parquet(x)))).filter(_._2.isSuccess).map(x => x._1)
This works because the collect() means that everything happens back on the
driver. So the sqlContext object makes sense and everything works fine.
But it is slow. I'm using yarn-client mode on a 6-node cluster with 17
executors, 40 GB ram on driver, 19GB on executors. And it takes about 1
minute to execute for 100 parquet files. Which is too long. Recall we need
to do this across hundreds of thousands of files.
I realize it is possible to parallelize after the read:
import scala.util.{Try, Success, Failure} val parquetFiles =
sc.textFile("file_list.txt") val intermediate_successes =
parquetFiles.collect().map(x => (x,
Try(sqlContext.read.parquet(x))))
val dist_successes = sc.parallelize(successes) val successes =
dist_successes.filter(_._2.isSuccess).map(x => x._1)
But this does not improve performance substantially. It seems the
bottleneck is that the reads are happening sequentially.
Is there a better way to do this?
Thanks,
Jordan
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-when-iterating-over-many-parquet-files-tp24850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]