Hi

Why dont you check if you can just process the large file standalone and
then do the outer loop next.

sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn(
"new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) )
.combineByKey()

Deenar

On 14 October 2015 at 05:18, SLiZn Liu <sliznmail...@gmail.com> wrote:

> Hey Spark Users,
>
> I kept getting java.lang.OutOfMemoryError: Java heap space as I read a
> massive amount of json files, iteratively via read.json(). Even the
> result RDD is rather small, I still get the OOM Error. The brief structure
> of my program reads as following, in psuedo-code:
>
> file_path_list.map{ jsonFile: String =>
>   sqlContext.read.json(jsonFile)
>     .select($"some", $"fields")
>     .withColumn("new_col", some_transformations($"col"))
>     .rdd.map( x: Row => (k, v) )
>     .combineByKey() // which groups a column into item lists by another 
> column as keys
> }.reduce( (i, j) => i.union(j) )
> .combineByKey() // which combines results from all json files
>
> I confess some of the json files are Gigabytes huge, yet the combined RDD
> is in a few Megabytes. I’m not familiar with the under-the-hood mechanism,
> but my intuitive understanding of how the code executes is, read the file
> once a time (where I can easily modify map to foreach when fetching from
> file_path_list, if that’s the case), do the inner transformation on DF
> and combine, then reduce and do the outer combine immediately, which
> doesn’t require to hold all RDDs generated from all files in the memory.
> Obviously, as my code raises OOM Error, I must have missed something
> important.
>
> From the debug log, I can tell the OOM Error happens when reading the same
> file, which is in a modest size of 2GB, while driver.memory is set to 13GB,
> and the available memory size before the code execution is around 8GB, on
> my standalone machine running as “local[8]”.
>
> To overcome this, I also tried to initialize an empty universal RDD
> variable, iteratively read one file at a time using foreach, then instead
> of reduce, simply combine each RDD generated by the json files, except the
> OOM Error remains.
>
> Other configurations:
>
>    - set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used
>    - set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
>
> Any suggestions other than scale up/out the spark cluster?
>
> BR,
> Todd Leo
> ​
>

Reply via email to