Hi 2015-06-04 15:29 GMT+02:00 James Aley <james.a...@swiftkey.com>:
> Hi, > > We have a load of Avro data coming into our data systems in the form of > relatively small files, which we're merging into larger Parquet files with > Spark. I've been following the docs and the approach I'm taking seemed > fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's > not the most optimal approach. > > I was wondering if anyone on this list might have some advice to make to > make this job as efficient as possible. Here's some code: > > DataFrame dfInput = sqlContext.load(inputPaths.get(0), > "com.databricks.spark.avro"); > long totalSize = getDirSize(inputPaths.get(0)); > > for (int i = 1; i < inputs.size(); ++i) { > dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), > "com.databricks.spark.avro")); > totalSize += getDirSize(inputPaths.get(i)); > } > > int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES); > DataFrame outputFrame; > > // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence > // the synchronize block below. Suggestions welcome here too! :-) > synchronized (this) { > RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false, > null); > outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema()); > } > > outputFrame.save(outputPath, "parquet", SaveMode.Overwrite); > > Here are some things bothering me: > > - Conversion to an RDD and back again so that we can use coalesce() to > reduce the number of partitions. This is because we read that repartition() > is not as efficient as coalesce(), and local micro benchmarks seemed to > somewhat confirm that this was faster. Is this really a good idea though? > Should we be doing something else? > > Repartition uses coalesce but with a forced shuffle step. Its just a shortcut for coalesce(xxx, true) Doing a coalesce sounds correct, I'd do the same :) Note that if you add the shuffle step, then your partitions should be better balanced. > > - Usage of unionAll() - this is the only way I could find to join the > separate data sets into a single data frame to save as Parquet. Is there a > better way? > > When using directly the inputformats you can do this FileInputFormat.addInputPath, it should perform at least as good as union. > > - Do I need to be using the DataFrame API at all? I'm not querying any > data here, so the nice API for SQL-like transformations of the data isn't > being used. The DataFrame API just seemed like the path of least resistance > for working with Avro and Parquet. Would there be any advantage to using > hadoopRDD() with the appropriate Input/Output formats? > > > Using directly the input/outputformats sounds viable. But the snippet you show seems clean enough and I am not sure there would be much value in making something (maybe) slightly faster but harder to understand. Eugen Any advice or tips greatly appreciated! > > > James. > > >