String has huge overhead in JVM. The official tuning guide is very
useful: http://spark.apache.org/docs/latest/tuning.html#memory-tuning
. In your case, since the input elements are all numbers, please
convert them into doubles right after the split (before groupBy) and
try to use primitive arrays to reduce the overhead. -Xiangrui

On Thu, Oct 9, 2014 at 2:25 PM, Aris <arisofala...@gmail.com> wrote:
> Hello Spark folks,
>
> I am doing a simple parsing of a CSV input file, and the input file is very
> large (~1GB). It seems I have a memory leak here and I am destroying my
> server. After using jmap to generate a Java heap dump and using the Eclipse
> Memory Analyzer, I basically learned that when I read in my big CSV and
> parse out the Double values...I end up having multiple gigabytes of strings
> in the heap space that have to get Garbage collected.
>
> After using the Memory Analyzer, it looks like the problem occurs right
> where I commented in the line below with LabeldPoint(...)
>
> it appears that the references NEVER go away, and this just eat up my
> memory.
>
> Any clue what's wrong here? The code is below, and I added an ALL CAPS
> comment...I put the whole method in there, since I might be doing something
> obviously wrong that I don't understand at all.
>
> Thank you!
>
>
>
>     def generateModels(sc: SparkContext, trainDataFile: String): Map[String,
> LogisticRegressionModel] = {
>
>       val properties = new
> Config("src/main/resources/config.properties").properties
>       val numIterations = properties("logreg.num.iterations").toInt
>
>       val rddText = sc.textFile(trainDataFile)
>       val rddGroup = rddText.map(line => line.split(",")).groupBy(_.last) //
> now grouped by category!
>       val rddGroupedLabeledPoints: RDD[(String, Iterable[LabeledPoint])] =
> rddGroup map { grp =>
>           (grp._1, // which category is this input data
>             grp._2.map{ rawRow =>
>               val rowNoCategory = rawRow.slice(1,rawRow.length - 1) //
> preparing to create LabeledPoint
> // I THINK THE PROBLEM IS RIGHT HERE, WHERE I CREATE THE LabeledPoint AND
> PARSE OUT THE DOUBLES FROM rowNoCategory
>               LabeledPoint(rawRow(0).toDouble,
> Vectors.dense(rowNoCategory.map(_.toDouble)))
> // I THINK THE PROBLEM IS RIGHT HERE, THE LINE ABOVE
>
>             }
>           )
>         }
>
>       val getCats: Array[String] = rddGroupedLabeledPoints map { catLPs =>
>         catLPs._1.toString
>       } collect()
>
>       System.err.println("Number of offers/categories discovered in input
> data: " + getCats.size)
>
>       val arrCatModels: Array[(String, LogisticRegressionModel)] = getCats
> map {category =>
>         (category.toString,
>           {
>             val singleGroupRdd: RDD[LabeledPoint] = rddGroupedLabeledPoints
> filter(grp => grp._1 == category.toString) flatMap( catLPs => catLPs._2)
>             val balancedSingleGroupRdd: RDD[LabeledPoint] =
> balanceBinaryLabeledPoints(singleGroupRdd)
>             balancedSingleGroupRdd.cache()
>             val trainedModel: LogisticRegressionModel =
> learnModelRegularized(balancedSingleGroupRdd,numIterations, new L1Updater)
>             trainedModel.clearThreshold()
>           }
>         )
>       }
>
>       arrCatModels.toMap
>
>     }
>
> }
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to