Hi, I'm not exactly sure what's your codes like though, ISTM this is a correct behaviour. If the size of data that a driver fetches exceeds the limit, the driver throws this exception. (See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L68 ) So, in your case, your driver tries to fetch 1345.5 MB data of 4 models from executors, then it fails. Thanks,
On Sat, Mar 5, 2016 at 6:11 AM, James Jia <james...@berkeley.edu> wrote: > I'm running a distributed KMeans algorithm with 4 executors. > > I have a RDD[Data]. I use mapPartition to run a learner on each data > partition, and then call reduce with my custom model reduce function to > reduce the result of the model to start a new iteration. > > The model size is around ~330 MB. I would expect that the required memory for > the serialized result at the driver to be at most 2*300 MB in order to reduce > two models, but it looks like Spark is serializing all of the models to the > driver before reducing. > > The error message says that the total size of the serialized results is > 1345.5MB, which is approximately 4 * 330 MB. I know I can set the driver's > max result size, but I just want to confirm that this is expected behavior. > > Thanks! > > James > > Stage 0:==============> (1 + 3) / > 4]16/02/19 05:59:28 ERROR TaskSetManager: Total size of serialized results of > 4 tasks (1345.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) > > org.apache.spark.SparkException: Job aborted due to stage failure: Total size > of serialized results of 4 tasks (1345.5 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > > at org.apache.spark.scheduler.DAGScheduler.org > <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > > at scala.Option.foreach(Option.scala:257) > > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) > > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) > > at org.apache.spark.rdd.RDD.reduce(RDD.scala:989) > > at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50) > > ... 50 elided > > -- --- Takeshi Yamamuro