I set SPARK_MEM in the driver process by setting "spark.executor.memory" to 10G. Each machine had 32G of RAM and a dedicated 32G spill volume. I believe all of the units are in pages, and the page size is the standard 4K. There are 15 slave nodes in the cluster and the sizes of the datasets I'm trying to join are about 2.5G and 25G when serialized and compressed in the RDD cache.
I appreciate that Python lacks the type of heap size controls available in Java, but lack any concept of how the different computational tasks are partitioned between Java and Python in pyspark (so it's unclear to me how much freedom python should have to chew through tons of memory). A couple questions which this raises for me are: -Are there any parameters I could tune differently to try and prevent this crashing behavior? -Do we know why this doesn't spill to disk (as Patrick Wendell mentions that shuffle spill is for aggregations which occur during the reduce phase)? -Do we have any hunch about what computation is occurring when the crash occurs? I'd definitely appreciate the insight of others, and am willing to run experiments and send results/errors/logs out. Also, I'm physically located in Soda Hall (Berkeley) so if anyone near by is interested to examine this first hand I am glad to meet up. best, -Brad On Wed, Apr 9, 2014 at 4:21 AM, Andrew Ash <and...@andrewash.com> wrote: > A JVM can easily be limited in how much memory it uses with the -Xmx > parameter, but Python doesn't have memory limits built in in such a > first-class way. Maybe the memory limits aren't making it to the python > executors. > > What was your SPARK_MEM setting? The JVM below seems to be using 603201 > (pages?) and the 3 large python processes each are using ~1800000 (pages?). > I'm unsure the units that the OOM killer's RSS column is in. Could be > either pages (4kb each) or bytes. > > > Apr 8 11:19:19 bennett kernel: [86368.978326] [ 2348] 1002 2348 12573 > 2102 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978329] [ 2349] 1002 2349 12573 > 2101 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978332] [ 2350] 1002 2350 12573 > 2101 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978336] [ 5115] 1002 5115 12571 > 2101 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978339] [ 5116] 1002 5116 12571 > 2101 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978341] [ 5117] 1002 5117 12571 > 2101 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978344] [ 7725] 1002 7725 12570 > 2098 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978347] [ 7726] 1002 7726 12570 > 2098 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978350] [ 7727] 1002 7727 12570 > 2098 22 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978353] [10324] 1002 10324 12570 > 2098 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978356] [10325] 1002 10325 12570 > 2098 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978359] [10326] 1002 10326 12570 > 2098 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978362] [12668] 1002 12668 603201 > 47932 190 0 0 java > Apr 8 11:19:19 bennett kernel: [86368.978366] [13295] 1002 13295 12570 > 2100 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978368] [13296] 1002 13296 12570 > 2100 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978371] [13297] 1002 13297 12570 > 2100 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978375] [15192] 1002 15192 12570 > 2098 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978377] [15193] 1002 15193 12570 > 2098 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978379] [15195] 1002 15195 12570 > 2098 23 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978381] [15198] 1002 15198 1845471 > 1818463 3573 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978383] [15200] 1002 15200 1710479 > 1686492 3316 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978384] [15201] 1002 15201 1788470 > 1762344 3463 0 0 python > Apr 8 11:19:19 bennett kernel: [86368.978386] Out of memory: Kill process > 15198 (python) score 221 or sacrifice child > Apr 8 11:19:19 bennett kernel: [86368.978389] Killed process 15198 (python) > total-vm:7381884kB, anon-rss:7273852kB, file-rss:0kB > > > On Tue, Apr 8, 2014 at 2:56 PM, Brad Miller <bmill...@eecs.berkeley.edu> > wrote: >> >> Hi All, >> >> I poked around a bit more to (1) confirm my suspicions that the crash >> was related to memory consumption and (2) figure out why there is no >> error shown in 12_stderr, the spark executor log file from the >> executors on bennett.research.intel.research.net. >> >> The syslog file (from /var/log/syslog on bennett, attached) shows that >> the machine ran out of memory, the memory was mostly consumed by 1 >> java process and 3 python processes (I am running pyspark with 3 cores >> per machine), and then the kernel began killing java and python >> processes to ease memory pressure. It seems likely that these >> processes were the spark processes, and there's no errors recorded in >> 12_stderr because the process was killed by the OS (rather than >> experiencing an unhandled "cannot allocate memory" exception). >> >> I'm a little confused how Spark could consume so much memory during >> the reduce phase of the shuffle. Shouldn't Spark remain within the >> SPARK_MEM limitations on memory consumption, and spill to disk in the >> event that there isn't enough memory? >> >> -Brad >> >> >> On Tue, Apr 8, 2014 at 12:50 PM, Brad Miller <bmill...@eecs.berkeley.edu> >> wrote: >> > Hi Patrick, >> > >> >> The shuffle data is written through the buffer cache of the operating >> >> system, so you would expect the files to show up there immediately and >> >> probably to show up as being their full size when you do "ls". In >> >> reality >> >> though these are likely residing in the OS cache and not on disk. >> > >> > I see. Perhaps the memory consumption is related to this? >> > >> >> Could you paste the error here? >> > >> > While I have definitely seen "cannot allocate memory" errors while >> > trying to do this, I am unable to reproduce one now. Instead, I am >> > able to produce "most recent failure: unknown" (see full error >> > displayed in my iPython session below). Initially, I assumed there >> > was some sort of non-determinism which caused the error to >> > occasionally be "unknown", but now I realize that it may have been a >> > consistent change which occurred when I updated to the latest >> > brach-0.9 (previously I was running a version I pulled around March >> > 10th). >> > >> > Py4JJavaError: An error occurred while calling o274.collect. >> > : org.apache.spark.SparkException: Job aborted: Task 2.0:29 failed 4 >> > times (most recent failure: unknown) >> > at >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) >> > at >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) >> > at >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> > at >> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> > at >> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) >> > at >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) >> > at >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) >> > at scala.Option.foreach(Option.scala:236) >> > at >> > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) >> > at >> > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) >> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >> > at akka.actor.ActorCell.invoke(ActorCell.scala:456) >> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >> > at akka.dispatch.Mailbox.run(Mailbox.scala:219) >> > at >> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >> > at >> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> > at >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> > at >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> > at >> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> > >> >> From the logs it looks like your executor has died. Would you be able >> >> to >> >> paste the log from the executor with the exact failure? It would show >> >> up in >> >> the /work directory inside of spark's directory on the cluster. >> > >> > I've attached the logging output from the driver from when I re-ran >> > the join operation this morning. It seems that specific, individual >> > workers (or perhaps executors is the right term?) begin to die and >> > then are re-launched by the master/driver. When I examine the >> > app-xxx...xxx folder corresponding to this job on the first worker to >> > fail (bennett.research.intel-research.net), there are several numbered >> > folders inside (12, 15, 21, 22, 23) which seem to correspond to each >> > invocation of the executor as recorded in the driver log. stdout is >> > consistently empty, and stderr is not. I have attached all of these >> > logs as <executor_id>_stderr. >> > >> > Surprisingly, 12_stderr does not record any sort of error, although >> > 15_stderr, 21_stderr, and 22_stderr do. These errors are all of the >> > form: >> > >> > 14/04/08 11:19:42 ERROR Executor: Uncaught exception in thread >> > Thread[stdin writer for python,5,main] >> > org.apache.spark.FetchFailedException: Fetch failed: null 0 -1 44 >> > at >> > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:316) >> > at >> > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:314) >> > at >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> > at >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> > at >> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> > at >> > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> > at >> > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> > at >> > org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:313) >> > at >> > org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:139) >> > at >> > org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43) >> > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) >> > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) >> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) >> > at >> > org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >> > Caused by: java.lang.Exception: Missing an output location for shuffle 0 >> > at >> > org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:317) >> > ... 17 more >> > >> > Note that in "Fetch failed: null 0 -1 44" the last numeral varies in >> > each of the errors. I have also attached an executor stderr trace >> > from malkin.research.intel-research.net, which one of several >> > executors to crash after bennett. Notice that this trace does contain >> > errors, although they seem to be saying that there's trouble >> > connecting to bennett. >> > >> > If anyone would like additional information, please let me know. >> > >> > best, >> > -Brad >> > >> > On Mon, Apr 7, 2014 at 9:40 PM, Patrick Wendell <pwend...@gmail.com> >> > wrote: >> >> >> >> >> >> >> >> On Mon, Apr 7, 2014 at 7:37 PM, Brad Miller >> >> <bmill...@eecs.berkeley.edu> >> >> wrote: >> >>> >> >>> I am running the latest version of PySpark branch-0.9 and having some >> >>> trouble with join. >> >>> >> >>> One RDD is about 100G (25GB compressed and serialized in memory) with >> >>> 130K records, the other RDD is about 10G (2.5G compressed and >> >>> serialized in memory) with 330K records. I load each RDD from HDFS, >> >>> invoke keyBy to key each record, and then attempt to join the RDDs. >> >>> >> >>> The join consistently crashes at the beginning of the reduce phase. >> >>> Note that when joining the 10G RDD to itself there is no problem. >> >>> >> >>> Prior to the crash, several suspicious things happen: >> >>> >> >>> -All map output files from the map phase of the join are written to >> >>> spark.local.dir, even though there should be plenty of memory left to >> >>> contain the map output. I am reasonably sure *all* map outputs are >> >>> written to disk because the size of the map output spill directory >> >>> matches the size of the shuffle write (as displayed in the user >> >>> interface) for each machine. >> >> >> >> >> >> The shuffle data is written through the buffer cache of the operating >> >> system, so you would expect the files to show up there immediately and >> >> probably to show up as being their full size when you do "ls". In >> >> reality >> >> though these are likely residing in the OS cache and not on disk. >> >> >> >>> >> >>> -In the beginning of the reduce phase of the join, memory consumption >> >>> on each work spikes and each machine runs out of memory (as evidenced >> >>> by a "Cannon allocate memory" exception in Java). This is >> >>> particularly surprising since each machine has 30G of ram and each >> >>> spark worker has only been allowed 10G. >> >> >> >> >> >> Could you paste the error here? >> >> >> >>> >> >>> -In the web UI both the "Shuffle Spill (Memory)" and "Shuffle Spill >> >>> (Disk)" fields for each machine remain at 0.0 despite shuffle files >> >>> being written into spark.local.dir. >> >> >> >> >> >> Shuffle spill is different than the shuffle files written to >> >> spark.local.dir. Shuffle spilling is for aggregations that occur on the >> >> reduce side of the shuffle. A join like this might not see any >> >> spilling. >> >> >> >> >> >> >> >> From the logs it looks like your executor has died. Would you be able >> >> to >> >> paste the log from the executor with the exact failure? It would show >> >> up in >> >> the /work directory inside of spark's directory on the cluster. > >