Hi

2015-06-04 15:29 GMT+02:00 James Aley <james.a...@swiftkey.com>:

> Hi,
>
> We have a load of Avro data coming into our data systems in the form of
> relatively small files, which we're merging into larger Parquet files with
> Spark. I've been following the docs and the approach I'm taking seemed
> fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
> not the most optimal approach.
>
> I was wondering if anyone on this list might have some advice to make to
> make this job as efficient as possible. Here's some code:
>
> DataFrame dfInput = sqlContext.load(inputPaths.get(0),
> "com.databricks.spark.avro");
> long totalSize = getDirSize(inputPaths.get(0));
>
> for (int i = 1; i < inputs.size(); ++i) {
>     dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
> "com.databricks.spark.avro"));
>     totalSize += getDirSize(inputPaths.get(i));
> }
>
> int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
> DataFrame outputFrame;
>
> // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
> // the synchronize block below. Suggestions welcome here too! :-)
> synchronized (this) {
>     RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
> null);
>     outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
> }
>
> outputFrame.save(outputPath, "parquet", SaveMode.Overwrite);
>
> Here are some things bothering me:
>
>    - Conversion to an RDD and back again so that we can use coalesce() to
>    reduce the number of partitions. This is because we read that repartition()
>    is not as efficient as coalesce(), and local micro benchmarks seemed to
>    somewhat confirm that this was faster. Is this really a good idea though?
>    Should we be doing something else?
>
> Repartition uses coalesce but with a forced shuffle step. Its just a
shortcut for coalesce(xxx, true)
Doing a coalesce sounds correct, I'd do the same :) Note that if you add
the shuffle step, then your partitions should be better balanced.

>
>    - Usage of unionAll() - this is the only way I could find to join the
>    separate data sets into a single data frame to save as Parquet. Is there a
>    better way?
>
> When using directly the inputformats you can do this
FileInputFormat.addInputPath, it should perform at least as good as union.

>
>    - Do I need to be using the DataFrame API at all? I'm not querying any
>    data here, so the nice API for SQL-like transformations of the data isn't
>    being used. The DataFrame API just seemed like the path of least resistance
>    for working with Avro and Parquet. Would there be any advantage to using
>    hadoopRDD() with the appropriate Input/Output formats?
>
>
>
Using directly the input/outputformats sounds viable. But the snippet you
show seems clean enough and I am not sure there would be much value in
making something (maybe) slightly faster but harder to understand.


Eugen

Any advice or tips greatly appreciated!
>
>
> James.
>
>
>

Reply via email to