Yeah, I tried with 10k and 30k and these still failed, will try with more then. Though that is a little disappointing, it only writes ~7TB of shuffle data which shouldn't in theory require more than 1000 reducers on my 10TB memory cluster (~7GB of spill per reducer). I'm now wondering if my shuffle partitions are uneven and I should use a custom partitioner, is there a way to get stats on the partition sizes from Spark ?
On Fri, Aug 28, 2015 at 12:46 PM, Jason <ja...@jasonknight.us> wrote: > I had similar problems to this (reduce side failures for large joins (25bn > rows with 9bn)), and found the answer was to further up the > spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for > me, but your tables look a little denser, so you may want to go even higher. > > On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <tom...@gmail.com> wrote: > >> I'm getting errors like "Removing executor with no recent heartbeats" & >> "Missing an output location for shuffle" errors for a large SparkSql join >> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to >> configure the job to avoid them. >> >> The initial stage completes fine with some 30k tasks on a cluster with 70 >> machines/10TB memory, generating about 6.5TB of shuffle writes, but then >> the shuffle stage first waits 30min in the scheduling phase according to >> the UI, and then dies with the mentioned errors. >> >> I can see in the GC logs that the executors reach their memory limits >> (32g per executor, 2 workers per machine) and can't allocate any more stuff >> in the heap. Fwiw, the top 10 in the memory use histogram are: >> >> num #instances #bytes class name >> ---------------------------------------------- >> 1: 249139595 11958700560 >> scala.collection.immutable.HashMap$HashMap1 >> 2: 251085327 8034730464 scala.Tuple2 >> 3: 243694737 5848673688 java.lang.Float >> 4: 231198778 5548770672 java.lang.Integer >> 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; >> 6: 72191582 2310130624 >> scala.collection.immutable.HashMap$HashTrieMap >> 7: 74114058 1778737392 java.lang.Long >> 8: 6059103 779203840 [Ljava.lang.Object; >> 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer >> 10: 34749 70122104 [B >> >> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): >> >> spark.core.connection.ack.wait.timeout 600 >> spark.executor.heartbeatInterval 60s >> spark.executor.memory 32g >> spark.mesos.coarse false >> spark.network.timeout 600s >> spark.shuffle.blockTransferService netty >> spark.shuffle.consolidateFiles true >> spark.shuffle.file.buffer 1m >> spark.shuffle.io.maxRetries 6 >> spark.shuffle.manager sort >> >> The join is currently configured with spark.sql.shuffle.partitions=1000 >> but that doesn't seem to help. Would increasing the partitions help ? Is >> there a formula to determine an approximate partitions number value for a >> join ? >> Any help with this job would be appreciated ! >> >> cheers, >> Tom >> >