Shuffle data is not kept in memory. Did you try additional memory configurations( https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence )
Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <do...@celtra.com> wrote: > Hi > > I have a spark cluster with 4 workers each with 13GB ram. I would like to > process a large data set (does not fit in memory) that consists of JSON > entries. These are the transformations applied: > > SparkContext.textFile(s3url). // read files from s3 > keyBy(_.parseJson.id) // key by id that is located in json string > groupByKey(number_of_group_tasks) //group by id > flatMap(case (key,lines) => { //do some stuff }) > > In the web view I can see a key by operation doing a shuffle write. If I > understand correctly the groupByKey transformation creates a wide RDD > dependency thus requiring a shuffle write. I have already increased > spark.akka.askTimeout to 30 seconds and still job fails with errors on > workers: > > Error communicating with MapOutputTracker > at > org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84) > at > org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170) > at > org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) > at > org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) > at > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47) > at > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [30000] milliseconds > at akka.dispatch.DefaultPromise.ready(Future.scala:870) > at akka.dispatch.DefaultPromise.result(Future.scala:874) > at akka.dispatch.Await$.result(Future.scala:74) > at > org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81) > ... 25 more > > > Before the error I can see this kind of logs: > > 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for > shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't > have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO > MapOutputTracker: Don't have map outputs for shuffle 0, fetching them > > Can you please help me understand what is going on? Is the whole shuffle > write RDD kept in memory and when cluster runs out of memory it starts > garbage collecting and re fetching from s3? > > If this is the case does spark require additional configuration for > effective shuffle write to disk? > > Regards, Domen >