I am having a problem trying to figure out how to solve a problem. I would
like to stream events from Kafka to my Spark Streaming app and write the
contents of each RDD out to a HDFS directory. Each event that comes into
the app via kafka will be JSON and have an event field with the name of the
event. I would like to grab the event name and then write out the event to
hdfs:///user/hdfs/<eventName>.
My first intuition was to grab the event name and put it into the rdd, then
run a forEachRDD loop and call save as text file where I concatenate the
event name into the directory path. I have pasted the code below but it
will not work since I cannot access the data inside and RDD inside a
forEachRDD loop. If I dump all the RDD data into an array using .collect I
wont be able to use the .saveAstextFile() method. I'm at a loss for coming
up with a way to do this. Any ideas/help would be greatly appreciated,
thanks!
case class Event(EventName: String, Payload: org.json4s.JValue)
object App {
def main(args: Array[String]) {
val ssc = new StreamingContext("local[6]", "Data", Seconds(20))
ssc.checkpoint("checkpoint")
val eventMap = Map("Events" -> 1)
val pipe = KafkaUtils.createStream(ssc,
"dockerrepo,dockerrepo,dockerrepo", "Cons1", eventMap).map(_._2)
val eventStream = pipe.map(data => {
parse(data)
}).map(json => {
implicit val formats = DefaultFormats
val eventName = (json \ "event").extractOpt[String]
(eventName, json)
Event(eventName.getOrElse("*** NO EVENT NAME ***"), json)
})
eventStream.foreachRDD(event => {
//val eventName = event.EventName //CAN'T ACCESS eventName!
event.saveAsTextFile("hdfs://ip-here/user/hdfs/" + eventName + "/rdd="
+ pageHit.id) //what I would like to do if I could access eventName
})
ssc.start()
ssc.awaitTermination()
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]