I am storing the output of mapPartitions in a ListBuffer and exposing its
iterator as the output. The output is a list of Long tuples(Tuple2). When I
check the size of the object using Spark's SizeEstimator.estimate method it
comes out to 80 bytes per record/tuple object(calculating this by "size of
ListBuffer object/# records"). This I think is too huge for a Tuple2 object
of long type(two 8 byte longs + some object overhead memory). Any ideas why
this is so and how to reduce the memory captured by output? I am sure I am
missing something obvious.
Also, these ListBuffer object are getting too huge for memory leading to
memory and disk spills causing bad performance. Any ideas on how I can just
simply write the output of mapPartitions without storing the whole output as
an in-memory object. Each input record to mapPartitions can generate 0 or
more output records, so I think I cannot use "rdd.map" function iterator. I
am not sure even if that will help my cause.
Here is the code code snippet.
/var outputRDD = sortedRDD.mapPartitionsWithIndex((partitionNo,p) => {
var outputList = ListBuffer[(Long,Long)]()
var inputCnt: Long = 0;
var outputCnt: Long = 0;
while (p.hasNext) {
inputCnt = inputCnt + 1;
val tpl = p.next()
var partitionKey = ""
try{
partitionKey = tpl._1.split(keyDelimiter)(0)
//Partition key
}catch{
case aob : ArrayIndexOutOfBoundsException => {
println("segmentKey:"+partitionKey);
}
}
val value = tpl._2
var xs: Array[Any] = value.toSeq.toArray;
//value.copyToArray(xs);
val xs_string : Array[String] = new Array[String](value.size);
for(i <- 0 to value.size-1){
xs_string(i) = xs(i) match { case None => ""
case null => ""
case _ => xs(i).toString()
}
}
val outputTuples = windowObject.process(partitionKey, xs_string);
if(outputTuples != null){
for (i <- 0 until outputTuples.size()) {
val outputRecord = outputTuples.get(i)
if (outputRecord != null) {
outputList +=
((outputRecord.getProfileID1 ,
outputRecord.getProfileID2))
outputCnt = outputCnt +1;
}
}
}
}
if(debugFlag.equals("DEBUG")){
logger.info("partitionNo:"+ partitionNo + ", input #: "+
inputCnt +",
output #: "+ outputCnt+", outputList object size:" +
SizeEstimator.estimate(outputList));
}
outputList.iterator
}, false)/
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-mapPartition-output-object-size-coming-larger-than-expected-tp28367.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]