On re running the cache statement, from the logs I see that when collect(stage 1) fails it always leads to mapPartition(stage 0) for one partition to be re-run. This can be seen from the collect log as well on the container log:
rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 The data is lzo compressed sequence file with compressed size ~ 26G. Is there a way to understand why shuffle keeps failing for one partition. I believe we have enough memory to store the uncompressed data in memory. On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood <sadhan.s...@gmail.com> wrote: > This is the log output: > > 2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation > (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS > SELECT * FROM xyz where date_prefix = 20141112' > > 2014-11-12 19:07:17,455 INFO Configuration.deprecation > (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is > deprecated. Instead, use mapreduce.job.maps > > 2014-11-12 19:07:17,756 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at > TableReader.scala:68 > > 2014-11-12 19:07:18,292 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84 > > 2014-11-12 19:07:22,801 INFO mapred.FileInputFormat > (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200 > > 2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at > Exchange.scala:86) > > 2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84) > with 1 output partitions (allowLocal=false) > > 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at > SparkPlan.scala:84) > > 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0) > > 2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0) > > 2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at > mapPartitions at Exchange.scala:86), which has no missing parents > > 2014-11-12 19:07:22,916 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at > DAGScheduler.scala:838 > > 2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0 > (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) > > 2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) > finished in 161.113 s > > 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - looking for newly runnable stages > > 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - running: Set() > > 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) > > 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - failed: Set() > > 2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() > > 2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at > SparkPlan.scala:84), which is now runnable > > 2014-11-12 19:10:04,112 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at > DAGScheduler.scala:838 > > 2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 > (MappedRDD[16] at map at SparkPlan.scala:84) > > 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler > (Logging.scala:logError(75)) - Lost executor 52 on > ip-10-61-175-167.ec2.internal: remote Akka client disassociated > > 2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor > (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system > [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has > failed, address is now gated for [5000] ms. Reason is: [Disassociated]. > > 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend > (Logging.scala:logError(75)) - Asked to remove non-existent executor 52 > > 2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1) > > 2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59)) > - Stage 0 is now unavailable on executor 52 (460/461, false) > > 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at > SparkPlan.scala:84) as failed due to a fetch failure from Stage 0 > (mapPartitions at Exchange.scala:86) > > 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84) > failed in 4.571 s > > 2014-11-12 19:10:08,687 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at > Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch > failure > > 2014-11-12 19:10:08,908 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Resubmitting failed stages > > 2014-11-12 19:10:08,974 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at > mapPartitions at Exchange.scala:86), which has no missing parents > > 2014-11-12 19:10:08,989 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at > DAGScheduler.scala:838 > > 2014-11-12 19:10:08,990 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0 > (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86) > > 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86) > finished in 66.475 s > > 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - looking for newly runnable stages > > 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - running: Set() > > 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - waiting: Set(Stage 1) > > 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - failed: Set() > > 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List() > > 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at > SparkPlan.scala:84), which is now runnable > > 2014-11-12 19:11:15,482 INFO spark.SparkContext > (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at > DAGScheduler.scala:838 > > 2014-11-12 19:11:15,482 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1 > (MappedRDD[16] at map at SparkPlan.scala:84) > > 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler > (Logging.scala:logError(75)) - Lost executor 372 on > ip-10-95-163-84.ec2.internal: remote Akka client disassociated > > 2014-11-12 19:11:21,655 WARN remote.ReliableDeliverySupervisor > (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system > [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed, > address is now gated for [5000] ms. Reason is: [Disassociated]. > > 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend > (Logging.scala:logError(75)) - Asked to remove non-existent executor 372 > > 2014-11-12 19:11:21,655 INFO scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3) > > > > > On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sadhan.s...@gmail.com> > wrote: > >> We are running spark on yarn with combined memory > 1TB and when trying >> to cache a table partition(which is < 100G), seeing a lot of failed collect >> stages in the UI and this never succeeds. Because of the failed collect, it >> seems like the mapPartitions keep getting resubmitted. We have more than >> enough memory so its surprising we are seeing this issue. Can someone >> please help. Thanks! >> >> The stack trace of the failed collect from UI is: >> >> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output >> location for shuffle 0 >> at >> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386) >> at >> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> at >> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382) >> at >> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178) >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) >> at >> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) >> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> >