This has all the symptoms of Yarn killing your executors due to them exceeding their memory limits. Could you check your RM/NM logs to see if that's the case?
(The error was because of an executor at domU-12-31-39-0B-F1-D1.compute-1.internal, so you can check that NM's log file.) If that's the case, you'll need to play with the "spark.yarn.executor.memoryOverhead" config (see http://spark.apache.org/docs/latest/running-on-yarn.html). On Tue, Sep 9, 2014 at 1:13 PM, Penny Espinoza <pesp...@societyconsulting.com> wrote: > Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs > successfully using master = local[4]. However, when I run it on a Hadoop > 2.2 EMR cluster using master yarn-client, it fails after running for about 5 > minutes. My main method does something like this: > > gets streaming context > creates a DStream from KafkaUtils.createStream (let’s call this a) > creates another DStream from a.flatMap (let’s call this b) > creates another DStream from b.updateStateByKey (c) > creates another DStream from c.flatMap (d) > runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to save > data to Cassandra > > > Here’s the error message: > > 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to > java.lang.Exception > java.lang.Exception: Could not compute split, block input-0-1410293154000 > not found > at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > > > According to logs, the block referenced in the error message was added to > memory only about 10 seconds before this error appears, and there is no > evidence in the log of any blocks being cleared from memory, or of it > running out of memory. Below is a snippet of the log: > > 14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000 > in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4 > KB, free: 490.8 MB) > … ( several other messages like the one above, but for different blocks) > 14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000 > in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB, > free: 457.3 MB) > 14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21 > blocks > 14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time > 1410293160000 ms to StorageLevel(false, true, false, false, 1) at time > 1410293160000 ms > 14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time > 1410293160000 ms for checkpointing at time 1410293160000 ms > 14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time > 1410293160000 ms > 14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time > 1410293160000 ms > 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for > time 1410293160000 ms > 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for > time 1410293160000 ms > 14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job > 1410293160000 ms.0 from job set of time 1410293160000 ms > 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for > time 1410293160000 ms to file > 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000' > 14/09/09 20:06:00 INFO spark.SparkContext: Starting job: > saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203 > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap > at FlatMappedDStream.scala:35) > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12 > (saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) with 2 output > partitions (allowLocal=false) > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage > 23(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage: > List(Stage 24) > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Missing parents: List(Stage > 24) > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting Stage 24 > (FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35), which has no > missing parents > 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Deleting > hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293110000.bk > 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Checkpoint for time > 1410293160000 ms saved to file > 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000', > took 4351 bytes and 55 ms > 14/09/09 20:06:00 INFO streaming.DStreamGraph: Clearing checkpoint data for > time 1410293160000 ms > 14/09/09 20:06:00 INFO streaming.DStreamGraph: Cleared checkpoint data for > time 1410293160000 ms > 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting 21 missing tasks > from Stage 24 (FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35) > 14/09/09 20:06:00 INFO cluster.YarnClientClusterScheduler: Adding task set > 24.0 with 21 tasks > 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:0 as TID > 91 on executor 2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL) > 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:0 as > 1437 bytes in 0 ms > 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:1 as TID > 92 on executor 2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL) > 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:1 as > 1437 bytes in 0 ms > 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Lost TID 91 (task 24.0:0) > 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to > java.lang.Exception > java.lang.Exception: Could not compute split, block input-0-1410293154000 > not found > at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > > > I have seen earlier threads recommend getting around this problem by setting > spark.streaming.unpersist to false, but that had no effect on the error. I > have also tried making some changes to shuffle-related settings. My conf > from logs is below. > > 14/09/09 20:04:32 INFO spark.SparkContext: Spark configuration: > spark.app.name=foo > spark.cleaner.ttl=86400 > spark.home=/home/hadoop/spark > spark.jars=file:/home/hadoop/rna/rna-spark-streaming-assembly-1.0-SNAPSHOT.jar > spark.logConf=true > spark.master=yarn-client > spark.shuffle.consolidateFiles=true > spark.shuffle.memoryFraction=0.4 > spark.storage.memoryFraction=0.5 > spark.streaming.unpersist=false > spark.yarn.dist.archives=file:/home/hadoop/rna/lib/jackson-core-asl-1.8.9.jar,file:/home/hadoop/rna/lib/jackson-mapper-asl-1.8.9.jar > > > Any idea of what might be happening to this block and how I might eliminate > this error? > > > > thanks > p > > -- Marcelo --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org