This has all the symptoms of Yarn killing your executors due to them
exceeding their memory limits. Could you check your RM/NM logs to see
if that's the case?

(The error was because of an executor at
domU-12-31-39-0B-F1-D1.compute-1.internal, so you can check that NM's
log file.)

If that's the case, you'll need to play with the
"spark.yarn.executor.memoryOverhead" config (see
http://spark.apache.org/docs/latest/running-on-yarn.html).

On Tue, Sep 9, 2014 at 1:13 PM, Penny Espinoza
<pesp...@societyconsulting.com> wrote:
> Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs
> successfully using master = local[4].  However, when I run it on a Hadoop
> 2.2 EMR cluster using master yarn-client, it fails after running for about 5
> minutes.  My main method does something like this:
>
> gets streaming context
> creates a DStream from KafkaUtils.createStream (let’s call this a)
> creates another DStream from a.flatMap (let’s call this b)
> creates another DStream from b.updateStateByKey (c)
> creates another DStream from c.flatMap (d)
> runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to save
> data to Cassandra
>
>
> Here’s the error message:
>
> 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.Exception
> java.lang.Exception: Could not compute split, block input-0-1410293154000
> not found
>         at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:744)
>
>
> According to logs, the block referenced in the error message was added to
> memory only about 10 seconds before this error appears, and there is no
> evidence in the log of any blocks being cleared from memory, or of it
> running out of memory.  Below is a snippet of the log:
>
> 14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000
> in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4
> KB, free: 490.8 MB)
> … ( several other messages like the one above, but for different blocks)
> 14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000
> in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB,
> free: 457.3 MB)
> 14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21
> blocks
> 14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time
> 1410293160000 ms to StorageLevel(false, true, false, false, 1) at time
> 1410293160000 ms
> 14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time
> 1410293160000 ms for checkpointing at time 1410293160000 ms
> 14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time
> 1410293160000 ms
> 14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time
> 1410293160000 ms
> 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for
> time 1410293160000 ms
> 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for
> time 1410293160000 ms
> 14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job
> 1410293160000 ms.0 from job set of time 1410293160000 ms
> 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for
> time 1410293160000 ms to file
> 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000'
> 14/09/09 20:06:00 INFO spark.SparkContext: Starting job:
> saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap
> at FlatMappedDStream.scala:35)
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12
> (saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) with 2 output
> partitions (allowLocal=false)
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage
> 23(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203)
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage:
> List(Stage 24)
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Missing parents: List(Stage
> 24)
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting Stage 24
> (FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35), which has no
> missing parents
> 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Deleting
> hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293110000.bk
> 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Checkpoint for time
> 1410293160000 ms saved to file
> 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-1410293160000',
> took 4351 bytes and 55 ms
> 14/09/09 20:06:00 INFO streaming.DStreamGraph: Clearing checkpoint data for
> time 1410293160000 ms
> 14/09/09 20:06:00 INFO streaming.DStreamGraph: Cleared checkpoint data for
> time 1410293160000 ms
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting 21 missing tasks
> from Stage 24 (FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35)
> 14/09/09 20:06:00 INFO cluster.YarnClientClusterScheduler: Adding task set
> 24.0 with 21 tasks
> 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:0 as TID
> 91 on executor 2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL)
> 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:0 as
> 1437 bytes in 0 ms
> 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Starting task 24.0:1 as TID
> 92 on executor 2: domU-12-31-39-0B-F1-D1.compute-1.internal (RACK_LOCAL)
> 14/09/09 20:06:04 INFO scheduler.TaskSetManager: Serialized task 24.0:1 as
> 1437 bytes in 0 ms
> 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Lost TID 91 (task 24.0:0)
> 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.Exception
> java.lang.Exception: Could not compute split, block input-0-1410293154000
> not found
>         at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>         at org.apache.spark.scheduler.Task.run(Task.scala:51)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:744)
>
>
> I have seen earlier threads recommend getting around this problem by setting
> spark.streaming.unpersist to false, but that had no effect on the error.  I
> have also tried making some changes to shuffle-related settings. My conf
> from logs is below.
>
> 14/09/09 20:04:32 INFO spark.SparkContext: Spark configuration:
> spark.app.name=foo
> spark.cleaner.ttl=86400
> spark.home=/home/hadoop/spark
> spark.jars=file:/home/hadoop/rna/rna-spark-streaming-assembly-1.0-SNAPSHOT.jar
> spark.logConf=true
> spark.master=yarn-client
> spark.shuffle.consolidateFiles=true
> spark.shuffle.memoryFraction=0.4
> spark.storage.memoryFraction=0.5
> spark.streaming.unpersist=false
> spark.yarn.dist.archives=file:/home/hadoop/rna/lib/jackson-core-asl-1.8.9.jar,file:/home/hadoop/rna/lib/jackson-mapper-asl-1.8.9.jar
>
>
> Any idea of what might be happening to this block and how I might eliminate
> this error?
>
>
>
> thanks
> p
>
>



-- 
Marcelo

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to