Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs successfully using master = local[4]. However, when I run it on a Hadoop 2.2 EMR cluster using master yarn-client, it fails after running for about 5 minutes. My main method does something like this:
1. gets streaming context 2. creates a DStream from KafkaUtils.createStream (let’s call this a) 3. creates another DStream from a.flatMap (let’s call this b) 4. creates another DStream from b.updateStateByKey (c) 5. creates another DStream from c.flatMap (d) 6. runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to save data to Cassandra Here’s the error message: 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-0-1410293154000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) According to logs, the block referenced in the error message was added to memory only about 10 seconds before this error appears, and there is no evidence in the log of any blocks being cleared from memory, or of it running out of memory. Below is a snippet of the log: 14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000 in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4 KB, free: 490.8 MB) … ( several other messages like the one above, but for different blocks) 14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000 in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB, free: 457.3 MB) 14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21 blocks 14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time 1410293160000 ms to StorageLevel(false, true, false, false, 1) at time 1410293160000 ms 14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time 1410293160000 ms for checkpointing at time 1410293160000 ms 14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time 1410293160000 ms 14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time 1410293160000 ms 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for time 1410293160000 ms 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for time 1410293160000 ms 14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job 1410293160000 ms.0 from job set of time 1410293160000 ms 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for time 1410293160000 ms to file 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000' 14/09/09 20:06:00 INFO spark.SparkContext: Starting job: saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap at FlatMappedDStream.scala:35) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12 (saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) with 2 output partitions (allowLocal=false) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage 23(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 24) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Missing parents: List(Stage 24) 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting Stage 24 (FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35), which has no missing parents 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Deleting hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293110000.bk 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Checkpoint for time 1410293160000 ms saved to file 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000', took 4351 bytes and 55 ms 14/09/09 20:06:00 INFO streaming.DStreamGraph: Clearing checkpoint data for time 1410293160000 ms 14/09/09 20:06:00 INFO streaming.DStreamGraph: Cleared checkpoint data for time 1410293160000 ms 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting 21 missing tasks from Stage 24 (FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35) 14/09/09 20:06:00 INFO cluster.YarnClientClusterScheduler: Adding task set 24.0 with 21 tasks 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:0 as TID 91 on executor 2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL) 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:0 as 1437 bytes in 0 ms 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:1 as TID 92 on executor 2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL) 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:1 as 1437 bytes in 0 ms 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Lost TID 91 (task 24.0:0) 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to java.lang.Exception java.lang.Exception: Could not compute split, block input-0-1410293154000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I have seen earlier threads recommend getting around this problem by setting spark.streaming.unpersist to false, but that had no effect on the error. I have also tried making some changes to shuffle-related settings. My conf from logs is below. 14/09/09 20:04:32 INFO spark.SparkContext: Spark configuration: spark.app.name=foo spark.cleaner.ttl=86400 spark.home=/home/hadoop/spark spark.jars=file:/home/hadoop/rna/rna-spark-streaming-assembly-1.0-SNAPSHOT.jar spark.logConf=true spark.master=yarn-client spark.shuffle.consolidateFiles=true spark.shuffle.memoryFraction=0.4 spark.storage.memoryFraction=0.5 spark.streaming.unpersist=false spark.yarn.dist.archives=file:/home/hadoop/rna/lib/jackson-core-asl-1.8.9.jar,file:/home/hadoop/rna/lib/jackson-mapper-asl-1.8.9.jar Any idea of what might be happening to this block and how I might eliminate this error? thanks p