Hi, We have a load of Avro data coming into our data systems in the form of relatively small files, which we're merging into larger Parquet files with Spark. I've been following the docs and the approach I'm taking seemed fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's not the most optimal approach.
I was wondering if anyone on this list might have some advice to make to make this job as efficient as possible. Here's some code: DataFrame dfInput = sqlContext.load(inputPaths.get(0), "com.databricks.spark.avro"); long totalSize = getDirSize(inputPaths.get(0)); for (int i = 1; i < inputs.size(); ++i) { dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i), "com.databricks.spark.avro")); totalSize += getDirSize(inputPaths.get(i)); } int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES); DataFrame outputFrame; // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence // the synchronize block below. Suggestions welcome here too! :-) synchronized (this) { RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false, null); outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema()); } outputFrame.save(outputPath, "parquet", SaveMode.Overwrite); Here are some things bothering me: - Conversion to an RDD and back again so that we can use coalesce() to reduce the number of partitions. This is because we read that repartition() is not as efficient as coalesce(), and local micro benchmarks seemed to somewhat confirm that this was faster. Is this really a good idea though? Should we be doing something else? - Usage of unionAll() - this is the only way I could find to join the separate data sets into a single data frame to save as Parquet. Is there a better way? - Do I need to be using the DataFrame API at all? I'm not querying any data here, so the nice API for SQL-like transformations of the data isn't being used. The DataFrame API just seemed like the path of least resistance for working with Avro and Parquet. Would there be any advantage to using hadoopRDD() with the appropriate Input/Output formats? Any advice or tips greatly appreciated! James.