Yeah, I tried with 10k and 30k and these still failed, will try with more
then. Though that is a little disappointing, it only writes ~7TB of shuffle
data which shouldn't in theory require more than 1000 reducers on my 10TB
memory cluster (~7GB of spill per reducer).
I'm now wondering if my shuffle partitions are uneven and I should use a
custom partitioner, is there a way to get stats on the partition sizes from
Spark ?

On Fri, Aug 28, 2015 at 12:46 PM, Jason <ja...@jasonknight.us> wrote:

> I had similar problems to this (reduce side failures for large joins (25bn
> rows with 9bn)), and found the answer was to further up the
> spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
> me, but your tables look a little denser, so you may want to go even higher.
>
> On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak <tom...@gmail.com> wrote:
>
>> I'm getting errors like "Removing executor with no recent heartbeats" &
>> "Missing an output location for shuffle" errors for a large SparkSql join
>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
>> configure the job to avoid them.
>>
>> The initial stage completes fine with some 30k tasks on a cluster with 70
>> machines/10TB memory, generating about 6.5TB of shuffle writes, but then
>> the shuffle stage first waits 30min in the scheduling phase according to
>> the UI, and then dies with the mentioned errors.
>>
>> I can see in the GC logs that the executors reach their memory limits
>> (32g per executor, 2 workers per machine) and can't allocate any more stuff
>> in the heap. Fwiw, the top 10 in the memory use histogram are:
>>
>> num     #instances         #bytes  class name
>> ----------------------------------------------
>>    1:     249139595    11958700560
>>  scala.collection.immutable.HashMap$HashMap1
>>    2:     251085327     8034730464  scala.Tuple2
>>    3:     243694737     5848673688  java.lang.Float
>>    4:     231198778     5548770672  java.lang.Integer
>>    5:      72191585     4298521576  [Lscala.collection.immutable.HashMap;
>>    6:      72191582     2310130624
>>  scala.collection.immutable.HashMap$HashTrieMap
>>    7:      74114058     1778737392  java.lang.Long
>>    8:       6059103      779203840  [Ljava.lang.Object;
>>    9:       5461096      174755072  scala.collection.mutable.ArrayBuffer
>>   10:         34749       70122104  [B
>>
>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):
>>
>> spark.core.connection.ack.wait.timeout 600
>> spark.executor.heartbeatInterval       60s
>> spark.executor.memory                  32g
>> spark.mesos.coarse                     false
>> spark.network.timeout                  600s
>> spark.shuffle.blockTransferService     netty
>> spark.shuffle.consolidateFiles         true
>> spark.shuffle.file.buffer              1m
>> spark.shuffle.io.maxRetries            6
>> spark.shuffle.manager                  sort
>>
>> The join is currently configured with spark.sql.shuffle.partitions=1000
>> but that doesn't seem to help. Would increasing the partitions help ? Is
>> there a formula to determine an approximate partitions number value for a
>> join ?
>> Any help with this job would be appreciated !
>>
>> cheers,
>> Tom
>>
>

Reply via email to