UPDATE I have removed and added things systematically to the job and have figured that the inclusion of the construction of the SparkContext object is what is causing it to fail.
The last run contained the code below. I keep losing executors apparently and I'm not sure why. Some of the relevant spark output is below, will add more on Monday as I must go participate in wknd activities. 14/11/15 14:53:43 INFO SparkDeploySchedulerBackend: Granted executor ID app-20141115145328-0025/3 on hostPort cloudera01.local.company.com:7078 with 8 cores, 512.0 MB RAM 14/11/15 14:53:43 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/3 is now RUNNING 14/11/15 14:53:46 INFO MemoryStore: ensureFreeSpace(1063) called with curMem=1063, maxMem=309225062 14/11/15 14:53:46 INFO MemoryStore: Block input-0-1416084826000 stored as bytes to memory (size 1063.0 B, free 294.9 MB) 14/11/15 14:53:46 INFO BlockManagerInfo: Added input-0-1416084826000 in memory on cloudera01.local.company.com:49902 (size: 1063.0 B, free: 294.9 MB) 14/11/15 14:53:46 INFO BlockManagerMaster: Updated info of block input-0-1416084826000 14/11/15 14:53:46 WARN BlockManager: Block input-0-1416084826000 already exists on this machine; not re-adding it 14/11/15 14:53:46 INFO BlockGenerator: Pushed block input-0-1416084826000 14/11/15 14:53:46 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkexecu...@cloudera01.local.company.com:52715/user/Executor#-1518587721] with ID 3 14/11/15 14:53:47 INFO BlockManagerInfo: Registering block manager cloudera01.local.company.com:46926 with 294.9 MB RAM 14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor 3 disconnected, so removing it 14/11/15 14:53:47 ERROR TaskSchedulerImpl: Lost an executor 3 (already removed): remote Akka client disassociated 14/11/15 14:53:47 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/3 is now EXITED (Command exited with code 1) 14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor app-20141115145328-0025/3 removed: Command exited with code 1 14/11/15 14:53:47 INFO AppClient$ClientActor: Executor added: app-20141115145328-0025/4 on worker-20141114114152-cloudera01.local.company.com-7078 (cloudera01.local.company.com:7078) with 8 cores BLOCK 2 - last block before app fails: 14/11/15 14:54:15 INFO BlockManagerInfo: Registering block manager cloudera01.local.uship.com:34335 with 294.9 MB RAM 14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor 9 disconnected, so removing it 14/11/15 14:54:16 ERROR TaskSchedulerImpl: Lost an executor 9 (already removed): remote Akka client disassociated 14/11/15 14:54:16 INFO AppClient$ClientActor: Executor updated: app-20141115145328-0025/9 is now EXITED (Command exited with code 1) 14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor app-20141115145328-0025/9 removed: Command exited with code 1 14/11/15 14:54:16 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED 14/11/15 14:54:16 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: Master removed our application: FAILED [hdfs@cloudera01 root]$ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.SparkConf import org.apache.spark._ import org.json4s._ import org.json4s.native.JsonMethods._ import scala.collection.mutable.Map import scala.collection.mutable.MutableList case class Event(EventName: String, Payload: org.json4s.JValue) object App { def main(args: Array[String]) { val ssc = new StreamingContext("local[2]", "Data", Seconds(20)) ssc.checkpoint("checkpoint") val conf = new SparkConf().setMaster("spark://cloudera01.local.company.com:7077") val sc = new SparkContext(conf) val eventMap = scala.collection.immutable.Map("Events" -> 1) val pipe = KafkaUtils.createStream(ssc, "dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2) val eventStream = pipe.map(data => { parse(data) }).map(json => { implicit val formats = DefaultFormats val eventName = (json \ "event").extractOpt[String] Event(eventName.getOrElse("*** NO EVENT NAME ***"), json) }) eventStream.foreach(x => { var arr = x.toArray x.foreachPartition(y => { y.foreach(z => {print(z)}) }) }) ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789p19012.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org