Try increase the shuffle memory fraction (by default it is only 16%). Again, if you run Spark 1.5, this will probably run a lot faster, especially if you increase the shuffle memory fraction ...
On Tue, Sep 1, 2015 at 8:13 AM, Thomas Dudziak <tom...@gmail.com> wrote: > While it works with sort-merge-join, it takes about 12h to finish (with > 10000 shuffle partitions). My hunch is that the reason for that is this: > > INFO ExternalSorter: Thread 3733 spilling in-memory map of 174.9 MB to > disk (62 times so far) > > (and lots more where this comes from). > > On Sat, Aug 29, 2015 at 7:17 PM, Reynold Xin <r...@databricks.com> wrote: > >> Can you try 1.5? This should work much, much better in 1.5 out of the box. >> >> For 1.4, I think you'd want to turn on sort-merge-join, which is off by >> default. However, the sort-merge join in 1.4 can still trigger a lot of >> garbage, making it slower. SMJ performance is probably 5x - 1000x better in >> 1.5 for your case. >> >> >> On Thu, Aug 27, 2015 at 6:03 PM, Thomas Dudziak <tom...@gmail.com> wrote: >> >>> I'm getting errors like "Removing executor with no recent heartbeats" & >>> "Missing an output location for shuffle" errors for a large SparkSql join >>> (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to >>> configure the job to avoid them. >>> >>> The initial stage completes fine with some 30k tasks on a cluster with >>> 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then >>> the shuffle stage first waits 30min in the scheduling phase according to >>> the UI, and then dies with the mentioned errors. >>> >>> I can see in the GC logs that the executors reach their memory limits >>> (32g per executor, 2 workers per machine) and can't allocate any more stuff >>> in the heap. Fwiw, the top 10 in the memory use histogram are: >>> >>> num #instances #bytes class name >>> ---------------------------------------------- >>> 1: 249139595 11958700560 >>> scala.collection.immutable.HashMap$HashMap1 >>> 2: 251085327 8034730464 scala.Tuple2 >>> 3: 243694737 5848673688 java.lang.Float >>> 4: 231198778 5548770672 java.lang.Integer >>> 5: 72191585 4298521576 [Lscala.collection.immutable.HashMap; >>> 6: 72191582 2310130624 >>> scala.collection.immutable.HashMap$HashTrieMap >>> 7: 74114058 1778737392 java.lang.Long >>> 8: 6059103 779203840 [Ljava.lang.Object; >>> 9: 5461096 174755072 scala.collection.mutable.ArrayBuffer >>> 10: 34749 70122104 [B >>> >>> Relevant settings are (Spark 1.4.1, Java 8 with G1 GC): >>> >>> spark.core.connection.ack.wait.timeout 600 >>> spark.executor.heartbeatInterval 60s >>> spark.executor.memory 32g >>> spark.mesos.coarse false >>> spark.network.timeout 600s >>> spark.shuffle.blockTransferService netty >>> spark.shuffle.consolidateFiles true >>> spark.shuffle.file.buffer 1m >>> spark.shuffle.io.maxRetries 6 >>> spark.shuffle.manager sort >>> >>> The join is currently configured with spark.sql.shuffle.partitions=1000 >>> but that doesn't seem to help. Would increasing the partitions help ? Is >>> there a formula to determine an approximate partitions number value for a >>> join ? >>> Any help with this job would be appreciated ! >>> >>> cheers, >>> Tom >>> >> >> >