String has huge overhead in JVM. The official tuning guide is very useful: http://spark.apache.org/docs/latest/tuning.html#memory-tuning . In your case, since the input elements are all numbers, please convert them into doubles right after the split (before groupBy) and try to use primitive arrays to reduce the overhead. -Xiangrui
On Thu, Oct 9, 2014 at 2:25 PM, Aris <arisofala...@gmail.com> wrote: > Hello Spark folks, > > I am doing a simple parsing of a CSV input file, and the input file is very > large (~1GB). It seems I have a memory leak here and I am destroying my > server. After using jmap to generate a Java heap dump and using the Eclipse > Memory Analyzer, I basically learned that when I read in my big CSV and > parse out the Double values...I end up having multiple gigabytes of strings > in the heap space that have to get Garbage collected. > > After using the Memory Analyzer, it looks like the problem occurs right > where I commented in the line below with LabeldPoint(...) > > it appears that the references NEVER go away, and this just eat up my > memory. > > Any clue what's wrong here? The code is below, and I added an ALL CAPS > comment...I put the whole method in there, since I might be doing something > obviously wrong that I don't understand at all. > > Thank you! > > > > def generateModels(sc: SparkContext, trainDataFile: String): Map[String, > LogisticRegressionModel] = { > > val properties = new > Config("src/main/resources/config.properties").properties > val numIterations = properties("logreg.num.iterations").toInt > > val rddText = sc.textFile(trainDataFile) > val rddGroup = rddText.map(line => line.split(",")).groupBy(_.last) // > now grouped by category! > val rddGroupedLabeledPoints: RDD[(String, Iterable[LabeledPoint])] = > rddGroup map { grp => > (grp._1, // which category is this input data > grp._2.map{ rawRow => > val rowNoCategory = rawRow.slice(1,rawRow.length - 1) // > preparing to create LabeledPoint > // I THINK THE PROBLEM IS RIGHT HERE, WHERE I CREATE THE LabeledPoint AND > PARSE OUT THE DOUBLES FROM rowNoCategory > LabeledPoint(rawRow(0).toDouble, > Vectors.dense(rowNoCategory.map(_.toDouble))) > // I THINK THE PROBLEM IS RIGHT HERE, THE LINE ABOVE > > } > ) > } > > val getCats: Array[String] = rddGroupedLabeledPoints map { catLPs => > catLPs._1.toString > } collect() > > System.err.println("Number of offers/categories discovered in input > data: " + getCats.size) > > val arrCatModels: Array[(String, LogisticRegressionModel)] = getCats > map {category => > (category.toString, > { > val singleGroupRdd: RDD[LabeledPoint] = rddGroupedLabeledPoints > filter(grp => grp._1 == category.toString) flatMap( catLPs => catLPs._2) > val balancedSingleGroupRdd: RDD[LabeledPoint] = > balanceBinaryLabeledPoints(singleGroupRdd) > balancedSingleGroupRdd.cache() > val trainedModel: LogisticRegressionModel = > learnModelRegularized(balancedSingleGroupRdd,numIterations, new L1Updater) > trainedModel.clearThreshold() > } > ) > } > > arrCatModels.toMap > > } > > } > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org