UPDATE

I have removed  and added things systematically to the job and have figured
that the inclusion of the construction of the SparkContext object is what is
causing it to fail.

The last run contained the code below.

I keep losing executors apparently and I'm not sure why.  Some of the
relevant spark output is below, will add more on Monday as I must go
participate in wknd activities.

 14/11/15 14:53:43 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20141115145328-0025/3 on hostPort cloudera01.local.company.com:7078 with
8 cores, 512.0 MB RAM
14/11/15 14:53:43 INFO AppClient$ClientActor: Executor updated:
app-20141115145328-0025/3 is now RUNNING
14/11/15 14:53:46 INFO MemoryStore: ensureFreeSpace(1063) called with
curMem=1063, maxMem=309225062
14/11/15 14:53:46 INFO MemoryStore: Block input-0-1416084826000 stored as
bytes to memory (size 1063.0 B, free 294.9 MB)
14/11/15 14:53:46 INFO BlockManagerInfo: Added input-0-1416084826000 in
memory on cloudera01.local.company.com:49902 (size: 1063.0 B, free: 294.9
MB)
14/11/15 14:53:46 INFO BlockManagerMaster: Updated info of block
input-0-1416084826000
14/11/15 14:53:46 WARN BlockManager: Block input-0-1416084826000 already
exists on this machine; not re-adding it
14/11/15 14:53:46 INFO BlockGenerator: Pushed block input-0-1416084826000
14/11/15 14:53:46 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://sparkexecu...@cloudera01.local.company.com:52715/user/Executor#-1518587721]
with ID 3
14/11/15 14:53:47 INFO BlockManagerInfo: Registering block manager
cloudera01.local.company.com:46926 with 294.9 MB RAM
14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor 3 disconnected,
so removing it
14/11/15 14:53:47 ERROR TaskSchedulerImpl: Lost an executor 3 (already
removed): remote Akka client disassociated
14/11/15 14:53:47 INFO AppClient$ClientActor: Executor updated:
app-20141115145328-0025/3 is now EXITED (Command exited with code 1)
14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor
app-20141115145328-0025/3 removed: Command exited with code 1
14/11/15 14:53:47 INFO AppClient$ClientActor: Executor added:
app-20141115145328-0025/4 on
worker-20141114114152-cloudera01.local.company.com-7078
(cloudera01.local.company.com:7078) with 8 cores

BLOCK 2 - last block before app fails:

14/11/15 14:54:15 INFO BlockManagerInfo: Registering block manager
cloudera01.local.uship.com:34335 with 294.9 MB RAM
14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor 9 disconnected,
so removing it
14/11/15 14:54:16 ERROR TaskSchedulerImpl: Lost an executor 9 (already
removed): remote Akka client disassociated
14/11/15 14:54:16 INFO AppClient$ClientActor: Executor updated:
app-20141115145328-0025/9 is now EXITED (Command exited with code 1)
14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor
app-20141115145328-0025/9 removed: Command exited with code 1
14/11/15 14:54:16 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: Master removed our application: FAILED
14/11/15 14:54:16 ERROR TaskSchedulerImpl: Exiting due to error from cluster
scheduler: Master removed our application: FAILED
[hdfs@cloudera01 root]$



import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark._

import org.json4s._
import org.json4s.native.JsonMethods._

import scala.collection.mutable.Map
import scala.collection.mutable.MutableList

case class Event(EventName: String, Payload: org.json4s.JValue)

object App {

  def main(args: Array[String]) {

    val ssc = new StreamingContext("local[2]", "Data", Seconds(20))
    ssc.checkpoint("checkpoint")

    
      val conf = new
SparkConf().setMaster("spark://cloudera01.local.company.com:7077")
      val sc = new SparkContext(conf)



    val eventMap = scala.collection.immutable.Map("Events" -> 1)
    val pipe = KafkaUtils.createStream(ssc,
"dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2)


    val eventStream = pipe.map(data => {
      parse(data)
    }).map(json => {


      implicit val formats = DefaultFormats
      val eventName = (json \ "event").extractOpt[String]
      Event(eventName.getOrElse("*** NO EVENT NAME ***"), json)

    })


    eventStream.foreach(x => {
      var arr = x.toArray
      x.foreachPartition(y => {
        y.foreach(z => {print(z)})

      })
    })


    ssc.start()
    ssc.awaitTermination()

  }

} 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789p19012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to