Why not something like:
lines.foreachRDD(rdd => {
*//Convert rdd(json) to map*
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
val myMap = mapper.readValue[Map[String,String]](x)
val event = myMap.getOrElse("event", System.currentTimeMillis())
rdd.saveAsTextFile("hdfs://akhldz:9000/" + event)
})
You can use the fasterxml jackson parser. Haven't tested the above code,
but i'm sure it will work.
Thanks
Best Regards
On Thu, Nov 13, 2014 at 6:27 AM, jschindler <[email protected]>
wrote:
> I am having a problem trying to figure out how to solve a problem. I would
> like to stream events from Kafka to my Spark Streaming app and write the
> contents of each RDD out to a HDFS directory. Each event that comes into
> the app via kafka will be JSON and have an event field with the name of the
> event. I would like to grab the event name and then write out the event to
> hdfs:///user/hdfs/<eventName>.
>
> My first intuition was to grab the event name and put it into the rdd, then
> run a forEachRDD loop and call save as text file where I concatenate the
> event name into the directory path. I have pasted the code below but it
> will not work since I cannot access the data inside and RDD inside a
> forEachRDD loop. If I dump all the RDD data into an array using .collect I
> wont be able to use the .saveAstextFile() method. I'm at a loss for coming
> up with a way to do this. Any ideas/help would be greatly appreciated,
> thanks!
>
>
> case class Event(EventName: String, Payload: org.json4s.JValue)
>
> object App {
>
> def main(args: Array[String]) {
>
> val ssc = new StreamingContext("local[6]", "Data", Seconds(20))
> ssc.checkpoint("checkpoint")
>
> val eventMap = Map("Events" -> 1)
> val pipe = KafkaUtils.createStream(ssc,
> "dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2)
>
> val eventStream = pipe.map(data => {
> parse(data)
> }).map(json => {
> implicit val formats = DefaultFormats
> val eventName = (json \ "event").extractOpt[String]
> (eventName, json)
> Event(eventName.getOrElse("*** NO EVENT NAME ***"), json)
>
> })
>
> eventStream.foreachRDD(event => {
> //val eventName = event.EventName //CAN'T ACCESS eventName!
> event.saveAsTextFile("hdfs://ip-here/user/hdfs/" + eventName +
> "/rdd="
> + pageHit.id) //what I would like to do if I could access eventName
> })
>
>
> ssc.start()
> ssc.awaitTermination()
>
> }
> }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>