I am still getting the error...even if i convert it to record object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") System.exit(1) }
//StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(10)) val sql = new SparkContext(sparkConf) val sqlContext = new SQLContext(sql) val timer = Time(10000) // ssc.checkpoint("checkpoint") //import sqlContext._ val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val jsonf = lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) case class Record(ID:String,name:String,score:String,school:String) val fields = jsonf.map(data=>Record(data("type").toString,data("name").toString,data("score").toString,data("school").toString)) val results = fields.transform((recrdd,tt) => { recrdd.registerAsTable("table1") val results = sql("select * from table1") println(results) results.foreach(println) }) //results.print() ssc.start() ssc.awaitTermination() } } I am getting error [error] /home/ubuntu/spark-1.0.0/external/jsonfile/src/main/scala/jsonfile.scala:36: value registerAsTable is not a member of org.apache.spark.rdd.RDD[Record] [error] recrdd.registerAsTable("table1") [error] ^ [error] one error found [error] (compile:compile) Compilation failed Please look into this and let me know if i am missing any thing. Thanks, -Srinivas. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9816.html Sent from the Apache Spark User List mailing list archive at Nabble.com.