Why not something like: lines.foreachRDD(rdd => {
*//Convert rdd(json) to map* val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) val myMap = mapper.readValue[Map[String,String]](x) val event = myMap.getOrElse("event", System.currentTimeMillis()) rdd.saveAsTextFile("hdfs://akhldz:9000/" + event) }) You can use the fasterxml jackson parser. Haven't tested the above code, but i'm sure it will work. Thanks Best Regards On Thu, Nov 13, 2014 at 6:27 AM, jschindler <john.schind...@utexas.edu> wrote: > I am having a problem trying to figure out how to solve a problem. I would > like to stream events from Kafka to my Spark Streaming app and write the > contents of each RDD out to a HDFS directory. Each event that comes into > the app via kafka will be JSON and have an event field with the name of the > event. I would like to grab the event name and then write out the event to > hdfs:///user/hdfs/<eventName>. > > My first intuition was to grab the event name and put it into the rdd, then > run a forEachRDD loop and call save as text file where I concatenate the > event name into the directory path. I have pasted the code below but it > will not work since I cannot access the data inside and RDD inside a > forEachRDD loop. If I dump all the RDD data into an array using .collect I > wont be able to use the .saveAstextFile() method. I'm at a loss for coming > up with a way to do this. Any ideas/help would be greatly appreciated, > thanks! > > > case class Event(EventName: String, Payload: org.json4s.JValue) > > object App { > > def main(args: Array[String]) { > > val ssc = new StreamingContext("local[6]", "Data", Seconds(20)) > ssc.checkpoint("checkpoint") > > val eventMap = Map("Events" -> 1) > val pipe = KafkaUtils.createStream(ssc, > "dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2) > > val eventStream = pipe.map(data => { > parse(data) > }).map(json => { > implicit val formats = DefaultFormats > val eventName = (json \ "event").extractOpt[String] > (eventName, json) > Event(eventName.getOrElse("*** NO EVENT NAME ***"), json) > > }) > > eventStream.foreachRDD(event => { > //val eventName = event.EventName //CAN'T ACCESS eventName! > event.saveAsTextFile("hdfs://ip-here/user/hdfs/" + eventName + > "/rdd=" > + pageHit.id) //what I would like to do if I could access eventName > }) > > > ssc.start() > ssc.awaitTermination() > > } > } > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >