Hello Spark folks,

I am doing a simple parsing of a CSV input file, and the input file is very
large (~1GB). It seems I have a memory leak here and I am destroying my
server. After using jmap to generate a Java heap dump and using the Eclipse
Memory Analyzer, I basically learned that when I read in my big CSV and
parse out the Double values...I end up having multiple gigabytes of strings
in the heap space that have to get Garbage collected.

After using the Memory Analyzer, it looks like the problem occurs right
where I commented in the line below with LabeldPoint(...)

it appears that the references NEVER go away, and this just eat up my
memory.

Any clue what's wrong here? The code is below, and I added an ALL CAPS
comment...I put the whole method in there, since I might be doing something
obviously wrong that I don't understand at all.

Thank you!



    def generateModels(sc: SparkContext, trainDataFile: String):
Map[String, LogisticRegressionModel] = {

      val properties = new
Config("src/main/resources/config.properties").properties
      val numIterations = properties("logreg.num.iterations").toInt

      val rddText = sc.textFile(trainDataFile)
      val rddGroup = rddText.map(line => line.split(",")).groupBy(_.last)
// now grouped by category!
      val rddGroupedLabeledPoints: RDD[(String, Iterable[LabeledPoint])] =
rddGroup map { grp =>
          (grp._1, // which category is this input data
            grp._2.map{ rawRow =>
              val rowNoCategory = rawRow.slice(1,rawRow.length - 1) //
preparing to create LabeledPoint
// I THINK THE PROBLEM IS RIGHT HERE, WHERE I CREATE THE LabeledPoint AND
PARSE OUT THE DOUBLES FROM rowNoCategory
              LabeledPoint(rawRow(0).toDouble,
Vectors.dense(rowNoCategory.map(_.toDouble)))
// I THINK THE PROBLEM IS RIGHT HERE, THE LINE ABOVE

            }
          )
        }

      val getCats: Array[String] = rddGroupedLabeledPoints map { catLPs =>
        catLPs._1.toString
      } collect()

      System.err.println("Number of offers/categories discovered in input
data: " + getCats.size)

      val arrCatModels: Array[(String, LogisticRegressionModel)] = getCats
map {category =>
        (category.toString,
          {
            val singleGroupRdd: RDD[LabeledPoint] = rddGroupedLabeledPoints
filter(grp => grp._1 == category.toString) flatMap( catLPs => catLPs._2)
            val balancedSingleGroupRdd: RDD[LabeledPoint] =
balanceBinaryLabeledPoints(singleGroupRdd)
            balancedSingleGroupRdd.cache()
            val trainedModel: LogisticRegressionModel =
learnModelRegularized(balancedSingleGroupRdd,numIterations, new L1Updater)
            trainedModel.clearThreshold()
          }
        )
      }

      arrCatModels.toMap

    }

}

Reply via email to