I'm getting errors like "Removing executor with no recent heartbeats" &
"Missing an output location for shuffle" errors for a large SparkSql join
(1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
configure the job to avoid them.

The initial stage completes fine with some 30k tasks on a cluster with 70
machines/10TB memory, generating about 6.5TB of shuffle writes, but then
the shuffle stage first waits 30min in the scheduling phase according to
the UI, and then dies with the mentioned errors.

I can see in the GC logs that the executors reach their memory limits (32g
per executor, 2 workers per machine) and can't allocate any more stuff in
the heap. Fwiw, the top 10 in the memory use histogram are:

num     #instances         #bytes  class name
----------------------------------------------
   1:     249139595    11958700560
 scala.collection.immutable.HashMap$HashMap1
   2:     251085327     8034730464  scala.Tuple2
   3:     243694737     5848673688  java.lang.Float
   4:     231198778     5548770672  java.lang.Integer
   5:      72191585     4298521576  [Lscala.collection.immutable.HashMap;
   6:      72191582     2310130624
 scala.collection.immutable.HashMap$HashTrieMap
   7:      74114058     1778737392  java.lang.Long
   8:       6059103      779203840  [Ljava.lang.Object;
   9:       5461096      174755072  scala.collection.mutable.ArrayBuffer
  10:         34749       70122104  [B

Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

spark.core.connection.ack.wait.timeout 600
spark.executor.heartbeatInterval       60s
spark.executor.memory                  32g
spark.mesos.coarse                     false
spark.network.timeout                  600s
spark.shuffle.blockTransferService     netty
spark.shuffle.consolidateFiles         true
spark.shuffle.file.buffer              1m
spark.shuffle.io.maxRetries            6
spark.shuffle.manager                  sort

The join is currently configured with spark.sql.shuffle.partitions=1000 but
that doesn't seem to help. Would increasing the partitions help ? Is there
a formula to determine an approximate partitions number value for a join ?
Any help with this job would be appreciated !

cheers,
Tom

Reply via email to