I am still getting the error...even if i convert it to record
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics>
<numThreads>")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val sql = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sql)
val timer = Time(10000)
// ssc.checkpoint("checkpoint")
//import sqlContext._
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicpMap).map(_._2)
val jsonf =
lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,
Any]])
case class Record(ID:String,name:String,score:String,school:String)
val fields =
jsonf.map(data=>Record(data("type").toString,data("name").toString,data("score").toString,data("school").toString))
val results = fields.transform((recrdd,tt) => {
recrdd.registerAsTable("table1")
val results = sql("select * from table1")
println(results)
results.foreach(println)
})
//results.print()
ssc.start()
ssc.awaitTermination()
}
}
I am getting error
[error]
/home/ubuntu/spark-1.0.0/external/jsonfile/src/main/scala/jsonfile.scala:36:
value registerAsTable is not a member of org.apache.spark.rdd.RDD[Record]
[error] recrdd.registerAsTable("table1")
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
Please look into this and let me know if i am missing any thing.
Thanks,
-Srinivas.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9816.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.