Why not something like:

lines.foreachRDD(rdd => {

        *//Convert rdd(json) to map*
        val mapper = new ObjectMapper() with ScalaObjectMapper
        mapper.registerModule(DefaultScalaModule)
        val myMap = mapper.readValue[Map[String,String]](x)

        val event = myMap.getOrElse("event", System.currentTimeMillis())


        rdd.saveAsTextFile("hdfs://akhldz:9000/" + event)

      })

​You​ can use the fasterxml jackson parser. Haven't tested the above code,
but i'm sure it will work.


Thanks
Best Regards

On Thu, Nov 13, 2014 at 6:27 AM, jschindler <john.schind...@utexas.edu>
wrote:

> I am having a problem trying to figure out how to solve a problem.  I would
> like to stream events from Kafka to my Spark Streaming app and write the
> contents of each RDD out to a HDFS directory.  Each event that comes into
> the app via kafka will be JSON and have an event field with the name of the
> event.  I would like to grab the event name and then write out the event to
> hdfs:///user/hdfs/<eventName>.
>
> My first intuition was to grab the event name and put it into the rdd, then
> run a forEachRDD loop and call save as text file where I concatenate the
> event name into the directory path.  I have pasted the code below but it
> will not work since I cannot access the data inside and RDD inside a
> forEachRDD loop.  If I dump all the RDD data into an array using .collect I
> wont be able to use the .saveAstextFile() method.  I'm at a loss for coming
> up with a way to do this.  Any ideas/help would be greatly appreciated,
> thanks!
>
>
> case class Event(EventName: String, Payload: org.json4s.JValue)
>
> object App {
>
>   def main(args: Array[String]) {
>
>     val ssc = new StreamingContext("local[6]", "Data", Seconds(20))
>     ssc.checkpoint("checkpoint")
>
>     val eventMap = Map("Events" -> 1)
>     val pipe = KafkaUtils.createStream(ssc,
> "dockerrepo,dockerrepo,dockerrepo", "Cons1",  eventMap).map(_._2)
>
>     val eventStream = pipe.map(data => {
>       parse(data)
>     }).map(json => {
>       implicit val formats = DefaultFormats
>       val eventName = (json \ "event").extractOpt[String]
>       (eventName, json)
>       Event(eventName.getOrElse("*** NO EVENT NAME ***"), json)
>
>     })
>
>     eventStream.foreachRDD(event => {
>     //val eventName = event.EventName  //CAN'T ACCESS eventName!
>         event.saveAsTextFile("hdfs://ip-here/user/hdfs/" + eventName +
> "/rdd="
> + pageHit.id)  //what I would like to do if I could access eventName
>     })
>
>
>     ssc.start()
>     ssc.awaitTermination()
>
>   }
> }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to