Hi
2015-06-04 15:29 GMT+02:00 James Aley <[email protected]>:
> Hi,
>
> We have a load of Avro data coming into our data systems in the form of
> relatively small files, which we're merging into larger Parquet files with
> Spark. I've been following the docs and the approach I'm taking seemed
> fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
> not the most optimal approach.
>
> I was wondering if anyone on this list might have some advice to make to
> make this job as efficient as possible. Here's some code:
>
> DataFrame dfInput = sqlContext.load(inputPaths.get(0),
> "com.databricks.spark.avro");
> long totalSize = getDirSize(inputPaths.get(0));
>
> for (int i = 1; i < inputs.size(); ++i) {
> dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
> "com.databricks.spark.avro"));
> totalSize += getDirSize(inputPaths.get(i));
> }
>
> int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
> DataFrame outputFrame;
>
> // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
> // the synchronize block below. Suggestions welcome here too! :-)
> synchronized (this) {
> RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
> null);
> outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
> }
>
> outputFrame.save(outputPath, "parquet", SaveMode.Overwrite);
>
> Here are some things bothering me:
>
> - Conversion to an RDD and back again so that we can use coalesce() to
> reduce the number of partitions. This is because we read that repartition()
> is not as efficient as coalesce(), and local micro benchmarks seemed to
> somewhat confirm that this was faster. Is this really a good idea though?
> Should we be doing something else?
>
> Repartition uses coalesce but with a forced shuffle step. Its just a
shortcut for coalesce(xxx, true)
Doing a coalesce sounds correct, I'd do the same :) Note that if you add
the shuffle step, then your partitions should be better balanced.
>
> - Usage of unionAll() - this is the only way I could find to join the
> separate data sets into a single data frame to save as Parquet. Is there a
> better way?
>
> When using directly the inputformats you can do this
FileInputFormat.addInputPath, it should perform at least as good as union.
>
> - Do I need to be using the DataFrame API at all? I'm not querying any
> data here, so the nice API for SQL-like transformations of the data isn't
> being used. The DataFrame API just seemed like the path of least resistance
> for working with Avro and Parquet. Would there be any advantage to using
> hadoopRDD() with the appropriate Input/Output formats?
>
>
>
Using directly the input/outputformats sounds viable. But the snippet you
show seems clean enough and I am not sure there would be much value in
making something (maybe) slightly faster but harder to understand.
Eugen
Any advice or tips greatly appreciated!
>
>
> James.
>
>
>