I am having a problem trying to figure out how to solve a problem.  I would
like to stream events from Kafka to my Spark Streaming app and write the
contents of each RDD out to a HDFS directory.  Each event that comes into
the app via kafka will be JSON and have an event field with the name of the
event.  I would like to grab the event name and then write out the event to
hdfs:///user/hdfs/<eventName>.  

My first intuition was to grab the event name and put it into the rdd, then
run a forEachRDD loop and call save as text file where I concatenate the
event name into the directory path.  I have pasted the code below but it
will not work since I cannot access the data inside and RDD inside a
forEachRDD loop.  If I dump all the RDD data into an array using .collect I
wont be able to use the .saveAstextFile() method.  I'm at a loss for coming
up with a way to do this.  Any ideas/help would be greatly appreciated,
thanks!


case class Event(EventName: String, Payload: org.json4s.JValue)

object App {

  def main(args: Array[String]) {
    
    val ssc = new StreamingContext("local[6]", "Data", Seconds(20))
    ssc.checkpoint("checkpoint")
    
    val eventMap = Map("Events" -> 1)
    val pipe = KafkaUtils.createStream(ssc,
"dockerrepo,dockerrepo,dockerrepo", "Cons1",  eventMap).map(_._2)

    val eventStream = pipe.map(data => {
      parse(data)
    }).map(json => {
      implicit val formats = DefaultFormats
      val eventName = (json \ "event").extractOpt[String]
      (eventName, json)
      Event(eventName.getOrElse("*** NO EVENT NAME ***"), json)

    })
    
    eventStream.foreachRDD(event => {
    //val eventName = event.EventName  //CAN'T ACCESS eventName!
        event.saveAsTextFile("hdfs://ip-here/user/hdfs/" + eventName + "/rdd="
+ pageHit.id)  //what I would like to do if I could access eventName
    })
      
    
    ssc.start()
    ssc.awaitTermination()

  }
}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to