Hello Spark folks, I am doing a simple parsing of a CSV input file, and the input file is very large (~1GB). It seems I have a memory leak here and I am destroying my server. After using jmap to generate a Java heap dump and using the Eclipse Memory Analyzer, I basically learned that when I read in my big CSV and parse out the Double values...I end up having multiple gigabytes of strings in the heap space that have to get Garbage collected.
After using the Memory Analyzer, it looks like the problem occurs right where I commented in the line below with LabeldPoint(...) it appears that the references NEVER go away, and this just eat up my memory. Any clue what's wrong here? The code is below, and I added an ALL CAPS comment...I put the whole method in there, since I might be doing something obviously wrong that I don't understand at all. Thank you! def generateModels(sc: SparkContext, trainDataFile: String): Map[String, LogisticRegressionModel] = { val properties = new Config("src/main/resources/config.properties").properties val numIterations = properties("logreg.num.iterations").toInt val rddText = sc.textFile(trainDataFile) val rddGroup = rddText.map(line => line.split(",")).groupBy(_.last) // now grouped by category! val rddGroupedLabeledPoints: RDD[(String, Iterable[LabeledPoint])] = rddGroup map { grp => (grp._1, // which category is this input data grp._2.map{ rawRow => val rowNoCategory = rawRow.slice(1,rawRow.length - 1) // preparing to create LabeledPoint // I THINK THE PROBLEM IS RIGHT HERE, WHERE I CREATE THE LabeledPoint AND PARSE OUT THE DOUBLES FROM rowNoCategory LabeledPoint(rawRow(0).toDouble, Vectors.dense(rowNoCategory.map(_.toDouble))) // I THINK THE PROBLEM IS RIGHT HERE, THE LINE ABOVE } ) } val getCats: Array[String] = rddGroupedLabeledPoints map { catLPs => catLPs._1.toString } collect() System.err.println("Number of offers/categories discovered in input data: " + getCats.size) val arrCatModels: Array[(String, LogisticRegressionModel)] = getCats map {category => (category.toString, { val singleGroupRdd: RDD[LabeledPoint] = rddGroupedLabeledPoints filter(grp => grp._1 == category.toString) flatMap( catLPs => catLPs._2) val balancedSingleGroupRdd: RDD[LabeledPoint] = balanceBinaryLabeledPoints(singleGroupRdd) balancedSingleGroupRdd.cache() val trainedModel: LogisticRegressionModel = learnModelRegularized(balancedSingleGroupRdd,numIterations, new L1Updater) trainedModel.clearThreshold() } ) } arrCatModels.toMap } }