Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs 
successfully using master = local[4].  However, when I run it on a Hadoop 2.2 
EMR cluster using master yarn-client, it fails after running for about 5 
minutes.  My main method does something like this:


  1.  gets streaming context
  2.  creates a DStream from KafkaUtils.createStream (let’s call this a)
  3.  creates another DStream from a.flatMap (let’s call this b)
  4.  creates another DStream from b.updateStateByKey (c)
  5.  creates another DStream from c.flatMap (d)
  6.  runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to 
save data to Cassandra

Here’s the error message:

14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to 
java.lang.Exception
java.lang.Exception: Could not compute split, block input-0-1410293154000 not 
found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

According to logs, the block referenced in the error message was added to 
memory only about 10 seconds before this error appears, and there is no 
evidence in the log of any blocks being cleared from memory, or of it running 
out of memory.  Below is a snippet of the log:

14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000 in 
memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4 KB, 
free: 490.8 MB)
… ( several other messages like the one above, but for different blocks)
14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000 in 
memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB, free: 
457.3 MB)
14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21 blocks
14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time 
1410293160000 ms to StorageLevel(false, true, false, false, 1) at time 
1410293160000 ms
14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time 
1410293160000 ms for checkpointing at time 1410293160000 ms
14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time 
1410293160000 ms
14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time 
1410293160000 ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for 
time 1410293160000 ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for time 
1410293160000 ms
14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job 
1410293160000 ms.0 from job set of time 1410293160000 ms
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for time 
1410293160000 ms to file 
'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000'
14/09/09 20:06:00 INFO spark.SparkContext: Starting job: saveAsNewAPIHadoopFile 
at CassandraRDDFunctions.scala:203
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap at 
FlatMappedDStream.scala:35)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12 
(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) with 2 output 
partitions (allowLocal=false)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage 
23(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage: 
List(Stage 24)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Missing parents: List(Stage 24)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting Stage 24 
(FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35), which has no 
missing parents
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Deleting 
hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293110000.bk
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Checkpoint for time 
1410293160000 ms saved to file 
'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000', 
took 4351 bytes and 55 ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Clearing checkpoint data for 
time 1410293160000 ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Cleared checkpoint data for time 
1410293160000 ms
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting 21 missing tasks from 
Stage 24 (FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35)
14/09/09 20:06:00 INFO cluster.YarnClientClusterScheduler: Adding task set 24.0 
with 21 tasks
14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:0 as TID 91 
on executor 2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL)
14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:0 as 1437 
bytes in 0 ms
14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:1 as TID 92 
on executor 2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL)
14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:1 as 1437 
bytes in 0 ms
14/09/09 20:06:04 WARN scheduler.TaskSetManager: Lost TID 91 (task 24.0:0)
14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to 
java.lang.Exception
java.lang.Exception: Could not compute split, block input-0-1410293154000 not 
found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.Task.run(Task.scala:51)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)


I have seen earlier threads recommend getting around this problem by setting 
spark.streaming.unpersist to false, but that had no effect on the error.  I 
have also tried making some changes to shuffle-related settings. My conf from 
logs is below.

14/09/09 20:04:32 INFO spark.SparkContext: Spark configuration:
spark.app.name=foo
spark.cleaner.ttl=86400
spark.home=/home/hadoop/spark
spark.jars=file:/home/hadoop/rna/rna-spark-streaming-assembly-1.0-SNAPSHOT.jar
spark.logConf=true
spark.master=yarn-client
spark.shuffle.consolidateFiles=true
spark.shuffle.memoryFraction=0.4
spark.storage.memoryFraction=0.5
spark.streaming.unpersist=false
spark.yarn.dist.archives=file:/home/hadoop/rna/lib/jackson-core-asl-1.8.9.jar,file:/home/hadoop/rna/lib/jackson-mapper-asl-1.8.9.jar

Any idea of what might be happening to this block and how I might eliminate 
this error?


thanks
p


Reply via email to