I am having a problem trying to figure out how to solve a problem. I would like to stream events from Kafka to my Spark Streaming app and write the contents of each RDD out to a HDFS directory. Each event that comes into the app via kafka will be JSON and have an event field with the name of the event. I would like to grab the event name and then write out the event to hdfs:///user/hdfs/<eventName>.
My first intuition was to grab the event name and put it into the rdd, then run a forEachRDD loop and call save as text file where I concatenate the event name into the directory path. I have pasted the code below but it will not work since I cannot access the data inside and RDD inside a forEachRDD loop. If I dump all the RDD data into an array using .collect I wont be able to use the .saveAstextFile() method. I'm at a loss for coming up with a way to do this. Any ideas/help would be greatly appreciated, thanks! case class Event(EventName: String, Payload: org.json4s.JValue) object App { def main(args: Array[String]) { val ssc = new StreamingContext("local[6]", "Data", Seconds(20)) ssc.checkpoint("checkpoint") val eventMap = Map("Events" -> 1) val pipe = KafkaUtils.createStream(ssc, "dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2) val eventStream = pipe.map(data => { parse(data) }).map(json => { implicit val formats = DefaultFormats val eventName = (json \ "event").extractOpt[String] (eventName, json) Event(eventName.getOrElse("*** NO EVENT NAME ***"), json) }) eventStream.foreachRDD(event => { //val eventName = event.EventName //CAN'T ACCESS eventName! event.saveAsTextFile("hdfs://ip-here/user/hdfs/" + eventName + "/rdd=" + pageHit.id) //what I would like to do if I could access eventName }) ssc.start() ssc.awaitTermination() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org