I'm getting errors like "Removing executor with no recent heartbeats" & "Missing an output location for shuffle" errors for a large SparkSql join (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to configure the job to avoid them.
The initial stage completes fine with some 30k tasks on a cluster with 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then the shuffle stage first waits 30min in the scheduling phase according to the UI, and then dies with the mentioned errors. I can see in the GC logs that the executors reach their memory limits (32g per executor, 2 workers per machine) and can't allocate any more stuff in the heap. Fwiw, the top 10 in the memory use histogram are: num #instances #bytes class name ---------------------------------------------- 1: 249139595 11958700560 scala.collection.immutable.HashMap$HashMap1 2: 251085327 8034730464 scala.Tuple2 3: 243694737 5848673688 java.lang.Float 4: 231198778 5548770672 java.lang.Integer 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; 6: 72191582 2310130624 scala.collection.immutable.HashMap$HashTrieMap 7: 74114058 1778737392 java.lang.Long 8: 6059103 779203840 [Ljava.lang.Object; 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer 10: 34749 70122104 [B Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): spark.core.connection.ack.wait.timeout 600 spark.executor.heartbeatInterval 60s spark.executor.memory 32g spark.mesos.coarse false spark.network.timeout 600s spark.shuffle.blockTransferService netty spark.shuffle.consolidateFiles true spark.shuffle.file.buffer 1m spark.shuffle.io.maxRetries 6 spark.shuffle.manager sort The join is currently configured with spark.sql.shuffle.partitions=1000 but that doesn't seem to help. Would increasing the partitions help ? Is there a formula to determine an approximate partitions number value for a join ? Any help with this job would be appreciated ! cheers, Tom