Hi Michael I’ll try 1.6 and report back.
The java doc does not say much about coalesce() or repartition(). When I use reparation() just before I save my output everything runs as expected I though coalesce() is an optimized version of reparation() and should be used when ever we know we are reducing the number of partitions. Kind regards Andy From: Michael Armbrust <mich...@databricks.com> Date: Monday, December 28, 2015 at 2:41 PM To: Andrew Davidson <a...@santacruzintegration.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory² > Unfortunately in 1.5 we didn't force operators to spill when ran out of memory > so there is not a lot you can do. It would be awesome if you could test with > 1.6 and see if things are any better? > > On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson <a...@santacruzintegration.com> > wrote: >> I am using spark 1.5.1. I am running into some memory problems with a java >> unit test. Yes I could fix it by setting Xmx (its set to 1024M) how ever I >> want to better understand what is going on so I can write better code in the >> future. The test runs on a Mac, master="Local[2]" >> >> I have a java unit test that starts by reading a 672K ascii file. I my output >> data file is 152K. Its seems strange that such a small amount of data would >> cause an out of memory exception. I am running a pretty standard machine >> learning process >> >> 1. Load data >> 2. create a ML pipeline >> 3. transform the data >> 4. Train a model >> 5. Make predictions >> 6. Join the predictions back to my original data set >> 7. Coalesce(1), I only have a small amount of data and want to save it in a >> single file >> 8. Save final results back to disk >> >> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to >> acquire memory” >> >> To try and figure out what is going I put log messages in to count the number >> of partitions >> >> Turns out I have 20 input files, each one winds up in a separate partition. >> Okay so after loading I call coalesce(1) and check to make sure I only have a >> single partition. >> >> The total number of observations is 1998. >> >> After calling step 7 I count the number of partitions and discovered I have >> 224 partitions!. Surprising given I called Coalesce(1) before I did anything >> with the data. My data set should easily fit in memory. When I save them to >> disk I get 202 files created with 162 of them being empty! >> >> In general I am not explicitly using cache. >> >> Some of the data frames get registered as tables. I find it easier to use >> sql. >> >> Some of the data frames get converted back to RDDs. I find it easier to >> create RDD<LabeledPoint> this way >> >> I put calls to unpersist(true). In several places >> >> private void memoryCheck(String name) { >> >> Runtime rt = Runtime.getRuntime(); >> >> logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} df.size: >> {}", >> >> name, >> >> String.format("%,d", rt.totalMemory()), >> >> String.format("%,d", rt.freeMemory())); >> >> } >> >> >> Any idea how I can get a better understanding of what is going on? My goal is >> to learn to write better spark code. >> >> Kind regards >> >> Andy >> >> Memory usages at various points in my unit test >> name: rawInput totalMemory: 447,741,952 freeMemory: 233,203,184 >> >> name: naiveBayesModel totalMemory: 509,083,648 freeMemory: 403,504,128 >> >> name: lpRDD totalMemory: 509,083,648 freeMemory: 402,288,104 >> >> name: results totalMemory: 509,083,648 freeMemory: 368,011,008 >> >> >> >> DataFrame exploreDF = results.select(results.col("id"), >> >> results.col("label"), >> >> results.col("binomialLabel"), >> >> results.col("labelIndex"), >> >> results.col("prediction"), >> >> results.col("words")); >> >> exploreDF.show(10); >> >> >> >> Yes I realize its strange to switch styles how ever this should not cause >> memory problems >> >> >> >> final String exploreTable = "exploreTable"; >> >> exploreDF.registerTempTable(exploreTable); >> >> String fmt = "SELECT * FROM %s where binomialLabel = ’signal'"; >> >> String stmt = String.format(fmt, exploreTable); >> >> >> >> DataFrame subsetToSave = sqlContext.sql(stmt);// .show(100); >> >> >> >> name: subsetToSave totalMemory: 1,747,451,904 freeMemory: 1,049,447,144 >> >> >> >> exploreDF.unpersist(true); does not resolve memory issue >> >> >> >