Hi All,
Couple days ago, I tried to integrate SQL and streaming together. My
understanding is I can transform RDD from Dstream to schemaRDD and execute
SQL on each RDD. But I got no luck
Would you guys help me take a look at my code? Thank you very much!
object KafkaSpark {
def main(args: Array[String]): Unit = {
if (args.length < 4) {
System.err.println("Usage: KafkaSpark <zkQuorum> <group> <topics>
<numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaSpark")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc);
// ssc.checkpoint("checkpoint")
// Importing the SQL context gives access to all the SQL functions and
implicit conversions.
import sqlContext._
val tt = Time(10000)
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(t => getRecord(t._2.split("#")))
val result = recordsStream.foreachRDD((recRDD, tt)=>{
recRDD.registerAsTable("records")
val result = sql("select * from records")
println(result)
result.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
def getRecord(l:Array[String]):Record = {
println("Getting the record")
Record(l(0), l(1))}
}