I sure do! [1] And yes- I'm really hoping they will chime in, otherwise I may dig a little deeper myself and start posting some jira tickets.
[1] http://www.slideshare.net/cjnolet On Mon, Feb 8, 2016 at 3:02 AM, Igor Berman <igor.ber...@gmail.com> wrote: > It's interesting to see what spark dev people will say. > Corey do you have presentation available online? > > On 8 February 2016 at 05:16, Corey Nolet <cjno...@gmail.com> wrote: > >> Charles, >> >> Thank you for chiming in and I'm glad someone else is experiencing this >> too and not just me. I know very well how the Spark shuffles work and I've >> done deep dive presentations @ Spark meetups in the past. This problem is >> somethng that goes beyond that and, I believe, it exposes a fundamental >> paradigm flaw in the design of Spark, unfortunately. Good thing is, I think >> it can be fixed. >> >> Also- in regards to how much data actually gets shuffled- believe it or >> not this problem can take a 30-40 minute job and make it run for 4 or more >> hours. If let the job run for 4+ hours the amount of data being shuffled >> for this particular dataset will be 100 or more TB. Usually, however, I end >> up killing the job long before that point because I realize it should not >> be taking this long. The particular dataset we're doing is not for >> real-time exploration. These are very large joins we're doing for jobs that >> we run a few times a day. >> >> On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xpnc54byp...@gmail.com> >> wrote: >> >>> "The dataset is 100gb at most, the spills can up to 10T-100T" >>> >>> -- I have had the same experiences, although not to this extreme (the >>> spills were < 10T while the input was ~ 100s gb) and haven't found any >>> solution yet. I don't believe this is related to input data format. in my >>> case, I got my input data by loading from Hive tables. >>> >>> On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote: >>> >>>> Hi,Corey: >>>> "The dataset is 100gb at most, the spills can up to 10T-100T", Are >>>> your input files lzo format, and you use sc.text() ? If memory is not >>>> enough, spark will spill 3-4x of input data to disk. >>>> >>>> >>>> ------------------ 原始邮件 ------------------ >>>> *发件人:* "Corey Nolet";<cjno...@gmail.com>; >>>> *发送时间:* 2016年2月7日(星期天) 晚上8:56 >>>> *收件人:* "Igor Berman"<igor.ber...@gmail.com>; >>>> *抄送:* "user"<user@spark.apache.org>; >>>> *主题:* Re: Shuffle memory woes >>>> >>>> As for the second part of your questions- we have a fairly complex join >>>> process which requires a ton of stage orchestration from our driver. I've >>>> written some code to be able to walk down our DAG tree and execute siblings >>>> in the tree concurrently where possible (forcing cache to disk on children >>>> that that have multiple chiildren themselves so that they can be run >>>> concurrently). Ultimatey, we have seen significant speedup in our jobs by >>>> keeping tasks as busy as possible processing concurrent stages. Funny >>>> enough though, the stage that is causing problems with shuffling for us has >>>> a lot of children and doesn't even run concurrently with any other stages >>>> so I ruled out the concurrency of the stages as a culprit for the >>>> shuffliing problem we're seeing. >>>> >>>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote: >>>> >>>>> Igor, >>>>> >>>>> I don't think the question is "why can't it fit stuff in memory". I >>>>> know why it can't fit stuff in memory- because it's a large dataset that >>>>> needs to have a reduceByKey() run on it. My understanding is that when it >>>>> doesn't fit into memory it needs to spill in order to consolidate >>>>> intermediary files into a single file. The more data you need to run >>>>> through this, the more it will need to spill. My findings is that once it >>>>> gets stuck in this spill chain with our dataset it's all over @ that point >>>>> because it will spill and spill and spill and spill and spill. If I give >>>>> the shuffle enough memory it won't- irrespective of the number of >>>>> partitions we have (i've done everything from repartition(500) to >>>>> repartition(2500)). It's not a matter of running out of memory on a single >>>>> node because the data is skewed. It's more a matter of the shuffle buffer >>>>> filling up and needing to spill. I think what may be happening is that it >>>>> gets to a point where it's spending more time reading/writing from disk >>>>> while doing the spills then it is actually processing any data. I can tell >>>>> this because I can see that the spills sometimes get up into the 10's to >>>>> 100's of TB where the input data was maybe acquireExecutionMemory at >>>>> most. Unfortunately my code is on a private internal network and I'm >>>>> not able to share it. >>>>> >>>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com> >>>>> wrote: >>>>> >>>>>> so can you provide code snippets: especially it's interesting to see >>>>>> what are your transformation chain, how many partitions are there on each >>>>>> side of shuffle operation >>>>>> >>>>>> the question is why it can't fit stuff in memory when you are >>>>>> shuffling - maybe your partitioner on "reduce" side is not configured >>>>>> properly? I mean if map side is ok, and you just reducing by key or >>>>>> something it should be ok, so some detail is missing...skewed data? >>>>>> aggregate by key? >>>>>> >>>>>> On 6 February 2016 at 20:13, Corey Nolet <cjno...@gmail.com> wrote: >>>>>> >>>>>>> Igor, >>>>>>> >>>>>>> Thank you for the response but unfortunately, the problem I'm >>>>>>> referring to goes beyond this. I have set the shuffle memory fraction >>>>>>> to be >>>>>>> 90% and set the cache memory to be 0. Repartitioning the RDD helped a >>>>>>> tad >>>>>>> on the map side but didn't do much for the spilling when there was no >>>>>>> longer any memory left for the shuffle. Also the new auto-memory >>>>>>> management >>>>>>> doesn't seem like it'll have too much of an effect after i've already >>>>>>> given >>>>>>> most the memory i've allocated to the shuffle. The problem I'm having is >>>>>>> most specifically related to the shuffle performing declining by several >>>>>>> orders of magnitude when it needs to spill multiple times (it ends up >>>>>>> spilling several hundred for me when it can't fit stuff into memory). >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> usually you can solve this by 2 steps >>>>>>>> make rdd to have more partitions >>>>>>>> play with shuffle memory fraction >>>>>>>> >>>>>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted >>>>>>>> automatically >>>>>>>> >>>>>>>> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I just recently had a discovery that my jobs were taking several >>>>>>>>> hours to completely because of excess shuffle spills. What I found >>>>>>>>> was that >>>>>>>>> when I hit the high point where I didn't have enough memory for the >>>>>>>>> shuffles to store all of their file consolidations at once, it could >>>>>>>>> spill >>>>>>>>> so many times that it causes my job's runtime to increase by orders of >>>>>>>>> magnitude (and sometimes fail altogether). >>>>>>>>> >>>>>>>>> I've played with all the tuning parameters I can find. To speed >>>>>>>>> the shuffles up, I tuned the akka threads to different values. I also >>>>>>>>> tuned >>>>>>>>> the shuffle buffering a tad (both up and down). >>>>>>>>> >>>>>>>>> I feel like I see a weak point here. The mappers are sharing >>>>>>>>> memory space with reducers and the shuffles need enough memory to >>>>>>>>> consolidate and pull otherwise they will need to spill and spill and >>>>>>>>> spill. >>>>>>>>> What i've noticed about my jobs is that this is a difference between >>>>>>>>> them >>>>>>>>> taking 30 minutes and 4 hours or more. Same job- just different memory >>>>>>>>> tuning. >>>>>>>>> >>>>>>>>> I've found that, as a result of the spilling, I'm better off not >>>>>>>>> caching any data in memory and lowering my storage fraction to 0 and >>>>>>>>> still >>>>>>>>> hoping I was able to give my shuffles enough memory that my data >>>>>>>>> doesn't >>>>>>>>> continuously spill. Is this the way it's supposed to be? It makes it >>>>>>>>> hard >>>>>>>>> because it seems like it forces the memory limits on my job- >>>>>>>>> otherwise it >>>>>>>>> could take orders of magnitude longer to execute. >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >