Thanks Nilesh, Thanks for sharing those docs. I have came across most of those tuning in past and believe me I have tune the hack of out of this job. What I can't beleive is spark needs 4x more resource then MapReduce to run the same job (for dataset magnitude of >100GB). I was able to run my job after giving ridiculous amount of executor memory (20g) and overhead(4g)! MapReduce counterpart worked just with 2gb mapper and 6gb reducer!
I am still not been able to take a heap dump of stuck spark executor so that I can analyze memory usage. But there's definitely some limitation with framework which doesn't makes it ideal at all for shuffle heavy jobs! On Sat, Feb 20, 2016 at 10:29 PM, Kuchekar <kuchekar.nil...@gmail.com> wrote: > Hi Nirav, > > I recently attended the Spark Summit East 2016 and almost > every talk about errors faced by community and/or tuning topics for Spark > mentioned this being the main problem (Executor lost and JVM out of > memory). > > Checkout this blogs that explains how to tune spark > <http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/>, > cheatsheet for tuning spark <http://techsuppdiva.github.io/spark1.6.html> > . > > Hope this helps, keep the community posted what resolved your issue if it > does. > > Thanks. > > Kuchekar, Nilesh > > On Sat, Feb 20, 2016 at 11:29 AM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> Thanks Nilesh. I don't think there;s heavy communication between driver >> and executor. However I'll try the settings you suggested. >> >> I can not replace groupBy with reduceBy as its not an associative >> operation. >> >> It is very frustrating to be honest. It was a piece of cake with map >> reduce compare to amount to time I am putting in tuning spark make things >> work. To remove doubt that executor might be running multiple tasks >> (executor.cores) and hence reduce to share memory, I set executor.cores to >> 1 so only 1 task have all the 15gb to its disposal!. Which is already 3 >> times it needs for most skewed key. I am going to need to profile for sure >> to understand what spark executors are doing there. For sure they are not >> willing to explain the situation but rather will say 'use reduceBy' >> >> >> >> >> >> On Thu, Feb 11, 2016 at 9:42 AM, Kuchekar <kuchekar.nil...@gmail.com> >> wrote: >> >>> Hi Nirav, >>> >>> I faced similar issue with Yarn, EMR 1.5.2 and >>> following Spark Conf helped me. You can set the values accordingly >>> >>> conf= (SparkConf().set("spark.master","yarn-client").setAppName( >>> "HalfWay").set("spark.driver.memory", "15G").set("spark.yarn.am.memory", >>> "15G")) >>> >>> conf=conf.set("spark.driver.maxResultSize","10G").set( >>> "spark.storage.memoryFraction","0.6").set("spark.shuffle.memoryFraction" >>> ,"0.6").set("spark.yarn.executor.memoryOverhead","4000") >>> >>> conf = conf.set("spark.executor.cores","4").set("spark.executor.memory", >>> "15G").set("spark.executor.instances","6") >>> >>> Is it also possible to use reduceBy in place of groupBy that might help >>> the shuffling too. >>> >>> >>> Kuchekar, Nilesh >>> >>> On Wed, Feb 10, 2016 at 8:09 PM, Nirav Patel <npa...@xactlycorp.com> >>> wrote: >>> >>>> We have been trying to solve memory issue with a spark job that >>>> processes 150GB of data (on disk). It does a groupBy operation; some of the >>>> executor will receive somehwere around (2-4M scala case objects) to work >>>> with. We are using following spark config: >>>> >>>> "executorInstances": "15", >>>> >>>> "executorCores": "1", (we reduce it to one so single task gets >>>> all the executorMemory! at least that's the assumption here) >>>> >>>> "executorMemory": "15000m", >>>> >>>> "minPartitions": "2000", >>>> >>>> "taskCpus": "1", >>>> >>>> "executorMemoryOverhead": "1300", >>>> >>>> "shuffleManager": "tungsten-sort", >>>> >>>> "storageFraction": "0.4" >>>> >>>> >>>> This is a snippet of what we see in spark UI for a Job that fails. >>>> >>>> This is a *stage* of this job that fails. >>>> >>>> Stage IdPool NameDescriptionSubmittedDurationTasks: Succeeded/Total >>>> InputOutputShuffle Read â–¾Shuffle WriteFailure Reason >>>> 5 (retry 15) prod >>>> <http://hdn7:18080/history/application_1454975800192_0447/stages/pool?poolname=prod> >>>> map >>>> at SparkDataJobs.scala:210 >>>> <http://hdn7:18080/history/application_1454975800192_0447/stages/stage?id=5&attempt=15> >>>> +details >>>> >>>> 2016/02/09 21:30:06 13 min >>>> 130/389 (16 failed) >>>> 1982.6 MB 818.7 MB org.apache.spark.shuffle.FetchFailedException: >>>> Error in opening >>>> FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/fasd/appcache/application_1454975800192_0447/blockmgr-abb77b52-9761-457a-b67d-42a15b975d76/0c/shuffle_0_39_0.data, >>>> offset=11421300, length=2353} >>>> >>>> This is one of the single *task* attempt from above stage that threw >>>> OOM >>>> >>>> 2 22361 0 FAILED PROCESS_LOCAL 38 / nd1.mycom.local 2016/02/09 22:10:42 5.2 >>>> min 1.6 min 7.4 MB / 375509 java.lang.OutOfMemoryError: Java heap space >>>> +details >>>> >>>> java.lang.OutOfMemoryError: Java heap space >>>> at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) >>>> at java.util.IdentityHashMap.put(IdentityHashMap.java:445) >>>> at >>>> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159) >>>> at >>>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:203) >>>> at >>>> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:202) >>>> at scala.collection.immutable.List.foreach(List.scala:318) >>>> at >>>> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:202) >>>> at >>>> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186) >>>> at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54) >>>> at >>>> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) >>>> at >>>> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) >>>> at >>>> org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:3 >>>> >>>> >>>> None of above suggest that it went out ot 15GB of memory that I >>>> initially allocated? So what am i missing here. What's eating my memory. >>>> >>>> We tried executorJavaOpts to get heap dump but it doesn't seem to work. >>>> >>>> -XX:-HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -3 %p' >>>> -XX:HeapDumpPath=/opt/cores/spark >>>> >>>> I don't see any cores being generated.. neither I can find Heap dump >>>> anywhere in logs. >>>> >>>> Also, how do I find yarn container ID from spark executor ID ? So that >>>> I can investigate yarn nodemanager and resourcemanager logs for particular >>>> container. >>>> >>>> PS - Job does not do any caching of intermediate RDD as each RDD is >>>> just used once for subsequent step. We use spark 1.5.2 over Yarn in >>>> yarn-client mode. >>>> >>>> >>>> Thanks >>>> >>>> >>>> >>>> >>>> >>>> >>>> [image: What's New with Xactly] >>>> <http://www.xactlycorp.com/email-click/> >>>> >>>> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] >>>> <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] >>>> <https://twitter.com/Xactly> [image: Facebook] >>>> <https://www.facebook.com/XactlyCorp> [image: YouTube] >>>> <http://www.youtube.com/xactlycorporation> >>> >>> >>> >> >> >> >> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> >> >> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] >> <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] >> <https://twitter.com/Xactly> [image: Facebook] >> <https://www.facebook.com/XactlyCorp> [image: YouTube] >> <http://www.youtube.com/xactlycorporation> >> > > -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/company/xactly-corporation> [image: Twitter] <https://twitter.com/Xactly> [image: Facebook] <https://www.facebook.com/XactlyCorp> [image: YouTube] <http://www.youtube.com/xactlycorporation>