I have the same issue (I'm using the latest 1.1.0-SNAPSHOT). I've increased my driver memory to 30G, executor memory to 10G, and spark.akka.askTimeout to 180. Still no good. My other configurations are:
spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryoserializer.buffer.mb 256 spark.shuffle.consolidateFiles true spark.shuffle.file.buffer.kb 400 spark.akka.frameSize 500 spark.akka.timeout 600 spark.akka.askTimeout 180 spark.core.connection.auth.wait.timeout 300 However I just got informed that the YARN cluster I'm using *throttles the resource for default queue. *Not sure if it's related. Jianshi On Wed, Aug 27, 2014 at 5:15 AM, Andrew Ash <and...@andrewash.com> wrote: > Hi Grega, > > Did you ever get this figured out? I'm observing the same issue in Spark > 1.0.2. > > For me it was after 1.5hr of a large .distinct call, followed by a > .saveAsTextFile() > > 14/08/26 20:57:43 INFO executor.CoarseGrainedExecutorBackend: Got > assigned task 18500 > 14/08/26 20:57:43 INFO executor.Executor: Running task ID 18500 > 14/08/26 20:57:43 INFO storage.BlockManager: Found block broadcast_0 > locally > 14/08/26 20:57:43 INFO spark.MapOutputTrackerWorker: Don't have map > outputs for shuffle 0, fetching them > 14/08/26 20:58:13 ERROR executor.Executor: Exception in task ID 18491 > org.apache.spark.SparkException: Error communicating with MapOutputTracker > at > org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:108) > at > org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:155) > at > org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:42) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [30 seconds] > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at > org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:105) > ... 23 more > > > On Tue, Mar 11, 2014 at 3:07 PM, Grega Kespret <gr...@celtra.com> wrote: > >> > Your input data read as RDD may be causing OOM, so thats where you can >> use different memory configuration. >> >> We are not getting any OOM exceptions, just akka future timeouts in >> mapoutputtracker and unsuccessful get of shuffle outputs, therefore >> refetching them. >> >> What is the industry practice when going about debugging such errors? >> >> Questions: >> - why are mapoutputtrackers timing out? ( and how to debug this properly?) >> - what is the task/purpose of mapoutputtracker? >> - how to check per-task objects size? >> >> Thanks, >> Grega >> >> On 11 Mar 2014, at 18:43, Mayur Rustagi <mayur.rust...@gmail.com> wrote: >> >> Shuffle data is always stored on disk, its unlikely to cause OOM. Your >> input data read as RDD may be causing OOM, so thats where you can use >> different memory configuration. >> >> Mayur Rustagi >> Ph: +1 (760) 203 3257 >> http://www.sigmoidanalytics.com >> @mayur_rustagi <https://twitter.com/mayur_rustagi> >> >> >> >> On Tue, Mar 11, 2014 at 9:20 AM, sparrow <do...@celtra.com> wrote: >> >>> I don't understand how exactly will that help. There are no persisted >>> RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is >>> ~40Mb. The small RDD is then persisted. >>> >>> Memory configuration should not affect shuffle data if I understand you >>> correctly? >>> >>> >>> >>> >>> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User >>> List] <[hidden email] >>> <http://user/SendEmail.jtp?type=node&node=2537&i=0>> wrote: >>> >>>> Shuffle data is not kept in memory. Did you try additional memory >>>> configurations( >>>> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence >>>> ) >>>> >>>> Mayur Rustagi >>>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" >>>> target="_blank">+1 (760) 203 3257 >>>> http://www.sigmoidanalytics.com >>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>> >>>> >>>> >>>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email] >>>> <http://user/SendEmail.jtp?type=node&node=2534&i=0>> wrote: >>>> >>>>> Hi >>>>> >>>>> I have a spark cluster with 4 workers each with 13GB ram. I would like >>>>> to process a large data set (does not fit in memory) that consists of JSON >>>>> entries. These are the transformations applied: >>>>> >>>>> SparkContext.textFile(s3url). // read files from s3 >>>>> keyBy(_.parseJson.id) // key by id that is located in json string >>>>> groupByKey(number_of_group_tasks) //group by id >>>>> flatMap(case (key,lines) => { //do some stuff }) >>>>> >>>>> In the web view I can see a key by operation doing a shuffle write. If >>>>> I understand correctly the groupByKey transformation creates a wide RDD >>>>> dependency thus requiring a shuffle write. I have already increased >>>>> spark.akka.askTimeout to 30 seconds and still job fails with errors >>>>> on workers: >>>>> >>>>> Error communicating with MapOutputTracker >>>>> at >>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84) >>>>> at >>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170) >>>>> at >>>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43) >>>>> at >>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >>>>> at >>>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32) >>>>> at >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>>>> at >>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) >>>>> at >>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:53) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) >>>>> at >>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47) >>>>> at >>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46) >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>>> at >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) >>>>> at >>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:724) >>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out >>>>> after [30000] milliseconds >>>>> at akka.dispatch.DefaultPromise.ready(Future.scala:870) >>>>> at akka.dispatch.DefaultPromise.result(Future.scala:874) >>>>> at akka.dispatch.Await$.result(Future.scala:74) >>>>> at >>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81) >>>>> ... 25 more >>>>> >>>>> >>>>> Before the error I can see this kind of logs: >>>>> >>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for >>>>> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't >>>>> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO >>>>> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them >>>>> >>>>> Can you please help me understand what is going on? Is the whole >>>>> shuffle write RDD kept in memory and when cluster runs out of memory it >>>>> starts garbage collecting and re fetching from s3? >>>>> >>>>> If this is the case does spark require additional configuration for >>>>> effective shuffle write to disk? >>>>> >>>>> Regards, Domen >>>>> >>>> >>>> >>>> >>>> ------------------------------ >>>> If you reply to this email, your message will be added to the >>>> discussion below: >>>> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html >>>> To start a new topic under Apache Spark User List, email [hidden >>>> email] <http://user/SendEmail.jtp?type=node&node=2537&i=1> >>>> To unsubscribe from Apache Spark User List, click here. >>>> NAML >>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>>> >>> >>> >>> ------------------------------ >>> View this message in context: Re: Out of memory on large RDDs >>> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2537.html> >>> Sent from the Apache Spark User List mailing list archive >>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >>> >> >> > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/