Hi Why dont you check if you can just process the large file standalone and then do the outer loop next.
sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn( "new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) ) .combineByKey() Deenar On 14 October 2015 at 05:18, SLiZn Liu <sliznmail...@gmail.com> wrote: > Hey Spark Users, > > I kept getting java.lang.OutOfMemoryError: Java heap space as I read a > massive amount of json files, iteratively via read.json(). Even the > result RDD is rather small, I still get the OOM Error. The brief structure > of my program reads as following, in psuedo-code: > > file_path_list.map{ jsonFile: String => > sqlContext.read.json(jsonFile) > .select($"some", $"fields") > .withColumn("new_col", some_transformations($"col")) > .rdd.map( x: Row => (k, v) ) > .combineByKey() // which groups a column into item lists by another > column as keys > }.reduce( (i, j) => i.union(j) ) > .combineByKey() // which combines results from all json files > > I confess some of the json files are Gigabytes huge, yet the combined RDD > is in a few Megabytes. I’m not familiar with the under-the-hood mechanism, > but my intuitive understanding of how the code executes is, read the file > once a time (where I can easily modify map to foreach when fetching from > file_path_list, if that’s the case), do the inner transformation on DF > and combine, then reduce and do the outer combine immediately, which > doesn’t require to hold all RDDs generated from all files in the memory. > Obviously, as my code raises OOM Error, I must have missed something > important. > > From the debug log, I can tell the OOM Error happens when reading the same > file, which is in a modest size of 2GB, while driver.memory is set to 13GB, > and the available memory size before the code execution is around 8GB, on > my standalone machine running as “local[8]”. > > To overcome this, I also tried to initialize an empty universal RDD > variable, iteratively read one file at a time using foreach, then instead > of reduce, simply combine each RDD generated by the json files, except the > OOM Error remains. > > Other configurations: > > - set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used > - set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) > > Any suggestions other than scale up/out the spark cluster? > > BR, > Todd Leo > >