Unfortunately in 1.5 we didn't force operators to spill when ran out of memory so there is not a lot you can do. It would be awesome if you could test with 1.6 and see if things are any better?
On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson < [email protected]> wrote: > I am using spark 1.5.1. I am running into some memory problems with a java > unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how ever I > want to better understand what is going on so I can write better code > in the future. The test runs on a Mac, master="Local[2]" > > I have a java unit test that starts by reading a 672K ascii file. I my > output data file is 152K. Its seems strange that such a small amount of > data would cause an out of memory exception. I am running a pretty standard > machine learning process > > > 1. Load data > 2. create a ML pipeline > 3. transform the data > 4. Train a model > 5. Make predictions > 6. Join the predictions back to my original data set > 7. Coalesce(1), I only have a small amount of data and want to save it > in a single file > 8. Save final results back to disk > > > Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to > acquire memory” > > To try and figure out what is going I put log messages in to count the > number of partitions > > Turns out I have 20 input files, each one winds up in a separate > partition. Okay so after loading I call coalesce(1) and check to make sure > I only have a single partition. > > The total number of observations is 1998. > > After calling step 7 I count the number of partitions and discovered I > have 224 partitions!. Surprising given I called Coalesce(1) before I > did anything with the data. My data set should easily fit in memory. When I > save them to disk I get 202 files created with 162 of them being empty! > > In general I am not explicitly using cache. > > Some of the data frames get registered as tables. I find it easier to use > sql. > > Some of the data frames get converted back to RDDs. I find it easier to > create RDD<LabeledPoint> this way > > I put calls to unpersist(true). In several places > > private void memoryCheck(String name) { > > Runtime rt = Runtime.getRuntime(); > > logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} > df.size: {}", > > name, > > String.format("%,d", rt.totalMemory()), > > String.format("%,d", rt.freeMemory())); > > } > > Any idea how I can get a better understanding of what is going on? My goal > is to learn to write better spark code. > > Kind regards > > Andy > > Memory usages at various points in my unit test > > name: rawInput totalMemory: 447,741,952 freeMemory: 233,203,184 > > name: naiveBayesModel totalMemory: 509,083,648 freeMemory: > 403,504,128 > > name: lpRDD totalMemory: 509,083,648 freeMemory: 402,288,104 > > name: results totalMemory: 509,083,648 freeMemory: 368,011,008 > > > DataFrame exploreDF = results.select(results.col("id"), > > results.col("label"), > > results.col("binomialLabel"), > > > results.col("labelIndex"), > > > results.col("prediction"), > > > results.col("words")); > > exploreDF.show(10); > > > > Yes I realize its strange to switch styles how ever this should not cause > memory problems > > > final String exploreTable = "exploreTable"; > > exploreDF.registerTempTable(exploreTable); > > String fmt = "SELECT * FROM %s where binomialLabel = ’signal'"; > > String stmt = String.format(fmt, exploreTable); > > > DataFrame subsetToSave = sqlContext.sql(stmt);// .show(100); > > > name: subsetToSave totalMemory: 1,747,451,904 freeMemory: 1,049,447,144 > > > exploreDF.unpersist(true); does not resolve memory issue > > >
