I have the same issue (I'm using the latest 1.1.0-SNAPSHOT).

I've increased my driver memory to 30G, executor memory to 10G,
and spark.akka.askTimeout to 180. Still no good. My other configurations
are:

spark.serializer
 org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.mb          256
spark.shuffle.consolidateFiles          true
spark.shuffle.file.buffer.kb            400
spark.akka.frameSize                    500
spark.akka.timeout                      600
spark.akka.askTimeout                   180
spark.core.connection.auth.wait.timeout 300

However I just got informed that the YARN cluster I'm using *throttles the
resource for default queue. *Not sure if it's related.


Jianshi




On Wed, Aug 27, 2014 at 5:15 AM, Andrew Ash <and...@andrewash.com> wrote:

> Hi Grega,
>
> Did you ever get this figured out?  I'm observing the same issue in Spark
> 1.0.2.
>
> For me it was after 1.5hr of a large .distinct call, followed by a
> .saveAsTextFile()
>
>  14/08/26 20:57:43 INFO executor.CoarseGrainedExecutorBackend: Got
> assigned task 18500
>  14/08/26 20:57:43 INFO executor.Executor: Running task ID 18500
>  14/08/26 20:57:43 INFO storage.BlockManager: Found block broadcast_0
> locally
>  14/08/26 20:57:43 INFO spark.MapOutputTrackerWorker: Don't have map
> outputs for shuffle 0, fetching them
>  14/08/26 20:58:13 ERROR executor.Executor: Exception in task ID 18491
>  org.apache.spark.SparkException: Error communicating with MapOutputTracker
>          at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:108)
>          at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:155)
>          at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:42)
>          at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:65)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>          at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>          at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>          at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
>          at org.apache.spark.scheduler.Task.run(Task.scala:51)
>          at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>          at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>          at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>          at java.lang.Thread.run(Thread.java:745)
>  Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [30 seconds]
>          at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>          at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>          at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>          at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>          at scala.concurrent.Await$.result(package.scala:107)
>          at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:105)
>          ... 23 more
>
>
> On Tue, Mar 11, 2014 at 3:07 PM, Grega Kespret <gr...@celtra.com> wrote:
>
>> > Your input data read as RDD may be causing OOM, so thats where you can
>> use different memory configuration.
>>
>> We are not getting any OOM exceptions, just akka future timeouts in
>> mapoutputtracker and unsuccessful get of shuffle outputs, therefore
>> refetching them.
>>
>> What is the industry practice when going about debugging such errors?
>>
>> Questions:
>> - why are mapoutputtrackers timing out? ( and how to debug this properly?)
>> - what is the task/purpose of mapoutputtracker?
>> - how to check per-task objects size?
>>
>> Thanks,
>> Grega
>>
>> On 11 Mar 2014, at 18:43, Mayur Rustagi <mayur.rust...@gmail.com> wrote:
>>
>> Shuffle data is always stored on disk, its unlikely to cause OOM. Your
>> input data read as RDD may be causing OOM, so thats where you can use
>> different memory configuration.
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>
>>
>>
>> On Tue, Mar 11, 2014 at 9:20 AM, sparrow <do...@celtra.com> wrote:
>>
>>> I don't understand how exactly will that help. There are no persisted
>>> RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is
>>> ~40Mb. The small RDD is then persisted.
>>>
>>> Memory configuration should not affect shuffle data if I understand you
>>> correctly?
>>>
>>>
>>>
>>>
>>> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User
>>> List] <[hidden email]
>>> <http://user/SendEmail.jtp?type=node&node=2537&i=0>> wrote:
>>>
>>>> Shuffle data is not kept in memory. Did you try additional memory
>>>> configurations(
>>>> https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence
>>>> )
>>>>
>>>> Mayur Rustagi
>>>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257"
>>>> target="_blank">+1 (760) 203 3257
>>>> http://www.sigmoidanalytics.com
>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>
>>>>
>>>>
>>>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]
>>>> <http://user/SendEmail.jtp?type=node&node=2534&i=0>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I have a spark cluster with 4 workers each with 13GB ram. I would like
>>>>> to process a large data set (does not fit in memory) that consists of JSON
>>>>> entries. These are the transformations applied:
>>>>>
>>>>> SparkContext.textFile(s3url). // read files from s3
>>>>> keyBy(_.parseJson.id) // key by id that is located in json string
>>>>> groupByKey(number_of_group_tasks) //group by id
>>>>> flatMap(case (key,lines) => { //do some stuff })
>>>>>
>>>>> In the web view I can see a key by operation doing a shuffle write. If
>>>>> I understand correctly the groupByKey transformation creates a wide RDD
>>>>> dependency thus requiring a shuffle write. I have already increased
>>>>> spark.akka.askTimeout to 30 seconds and still job fails with errors
>>>>> on workers:
>>>>>
>>>>> Error communicating with MapOutputTracker
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>>>>         at
>>>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>>>>>         at
>>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>>         at
>>>>> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>>         at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>>>>>         at
>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>         at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>>         at
>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>         at java.lang.Thread.run(Thread.java:724)
>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>>> after [30000] milliseconds
>>>>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>>>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>>>>         at akka.dispatch.Await$.result(Future.scala:74)
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>>>>         ... 25 more
>>>>>
>>>>>
>>>>> Before the error I can see this kind of logs:
>>>>>
>>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for
>>>>> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't
>>>>> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
>>>>> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>>>>>
>>>>> Can you please help me understand what is going on? Is the whole
>>>>> shuffle write RDD kept in memory and when cluster runs out of memory it
>>>>> starts garbage collecting and re fetching from s3?
>>>>>
>>>>> If this is the case does spark require additional configuration for
>>>>> effective shuffle write to disk?
>>>>>
>>>>> Regards, Domen
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>  If you reply to this email, your message will be added to the
>>>> discussion below:
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
>>>>  To start a new topic under Apache Spark User List, email [hidden
>>>> email] <http://user/SendEmail.jtp?type=node&node=2537&i=1>
>>>> To unsubscribe from Apache Spark User List, click here.
>>>> NAML
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Out of memory on large RDDs
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2537.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to