Hi??Corey??
   "The dataset is 100gb at most, the spills can up to 10T-100T", Are your 
input files lzo format, and you use sc.text() ? If memory is not enough, spark 
will spill 3-4x of input data to disk.




------------------ ???????? ------------------
??????: "Corey Nolet";<cjno...@gmail.com>;
????????: 2016??2??7??(??????) ????8:56
??????: "Igor Berman"<igor.ber...@gmail.com>; 
????: "user"<user@spark.apache.org>; 
????: Re: Shuffle memory woes



As for the second part of your questions- we have a fairly complex join process 
which requires a ton of stage orchestration from our driver. I've written some 
code to be able to walk down our DAG tree and execute siblings in the tree 
concurrently where possible (forcing cache to disk on children that that have 
multiple chiildren themselves so that they can be run concurrently). Ultimatey, 
we have seen significant speedup in our jobs by keeping tasks as busy as 
possible processing concurrent stages. Funny enough though, the stage that is 
causing problems with shuffling for us has a lot of children and doesn't even 
run concurrently with any other stages so I ruled out the concurrency of the 
stages as a culprit for the shuffliing problem we're seeing.

On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:
Igor,

I don't think the question is "why can't it fit stuff in memory". I know why it 
can't fit stuff in memory- because it's a large dataset that needs to have a 
reduceByKey() run on it. My understanding is that when it doesn't fit into 
memory it needs to spill in order to consolidate intermediary files into a 
single file. The more data you need to run through this, the more it will need 
to spill. My findings is that once it gets stuck in this spill chain with our 
dataset it's all over @ that point because it will spill and spill and spill 
and spill and spill. If I give the shuffle enough memory it won't- irrespective 
of the number of partitions we have (i've done everything from repartition(500) 
to repartition(2500)). It's not a matter of running out of memory on a single 
node because the data is skewed. It's more a matter of the shuffle buffer 
filling up and needing to spill. I think what may be happening is that it gets 
to a point where it's spending more time reading/writing from disk while doing 
the spills then it is actually processing any data. I can tell this because I 
can see that the spills sometimes get up into the 10's to 100's of TB where the 
input data was maybe acquireExecutionMemory at most. Unfortunately my code is 
on a private internal network and I'm not able to share it. 


On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com> wrote:
so can you provide code snippets: especially it's interesting to see what are 
your transformation chain, how many partitions are there on each side of 
shuffle operation

the question is why it can't fit stuff in memory when you are shuffling - maybe 
your partitioner on "reduce" side is not configured properly? I mean if map 
side is ok, and you just reducing by key or something it should be ok, so some 
detail is missing...skewed data? aggregate by key?


On 6 February 2016 at 20:13, Corey Nolet <cjno...@gmail.com> wrote:
Igor,

Thank you for the response but unfortunately, the problem I'm referring to goes 
beyond this. I have set the shuffle memory fraction to be 90% and set the cache 
memory to be 0. Repartitioning the RDD helped a tad on the map side but didn't 
do much for the spilling when there was no longer any memory left for the 
shuffle. Also the new auto-memory management doesn't seem like it'll have too 
much of an effect after i've already given most the memory i've allocated to 
the shuffle. The problem I'm having is most specifically related to the shuffle 
performing declining by several orders of magnitude when it needs to spill 
multiple times (it ends up spilling several hundred for me when it can't fit 
stuff into memory).






On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com> wrote:
Hi,usually you can solve this by 2 steps
make rdd to have more partitions
play with shuffle memory fraction


in spark 1.6 cache vs shuffle memory fractions are adjusted automatically


On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
I just recently had a discovery that my jobs were taking several hours to 
completely because of excess shuffle spills. What I found was that when I hit 
the high point where I didn't have enough memory for the shuffles to store all 
of their file consolidations at once, it could spill so many times that it 
causes my job's runtime to increase by orders of magnitude (and sometimes fail 
altogether).


I've played with all the tuning parameters I can find. To speed the shuffles 
up, I tuned the akka threads to different values. I also tuned the shuffle 
buffering a tad (both up and down). 


I feel like I see a weak point here. The mappers are sharing memory space with 
reducers and the shuffles need enough memory to consolidate and pull otherwise 
they will need to spill and spill and spill. What i've noticed about my jobs is 
that this is a difference between them taking 30 minutes and 4 hours or more. 
Same job- just different memory tuning.


I've found that, as a result of the spilling, I'm better off not caching any 
data in memory and lowering my storage fraction to 0 and still hoping I was 
able to give my shuffles enough memory that my data doesn't continuously spill. 
Is this the way it's supposed to be? It makes it hard because it seems like it 
forces the memory limits on my job- otherwise it could take orders of magnitude 
longer to execute.

Reply via email to