Can you try defining the case class outside the main function. In fact outside the object?
TD On Tue, Jul 15, 2014 at 8:20 PM, srinivas <kusamsrini...@gmail.com> wrote: > Hi TD, > > I uncomment import sqlContext._ and tried to compile the code > > import java.util.Properties > import kafka.producer._ > import org.apache.spark.streaming._ > import org.apache.spark.streaming.kafka._ > import org.apache.spark.streaming.StreamingContext._ > import org.apache.spark.SparkConf > import scala.util.parsing.json.JSON > import org.apache.spark.sql.SQLContext > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > object KafkaWordCount { > def main(args: Array[String]) { > if (args.length < 4) { > System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> > <numThreads>") > System.exit(1) > } > > //StreamingExamples.setStreamingLogLevels() > > val Array(zkQuorum, group, topics, numThreads) = args > val sparkConf = new SparkConf().setAppName("KafkaWordCount") > val ssc = new StreamingContext(sparkConf, Seconds(10)) > val sql = new SparkContext(sparkConf) > val sqlContext = new SQLContext(sql) > val timer = Time(10000) > // ssc.checkpoint("checkpoint") > > import sqlContext._ > val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap > val lines = KafkaUtils.createStream(ssc, zkQuorum, group, > topicpMap).map(_._2) > val jsonf = > > lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]]) > case class Record(ID:String,name:String,score:String,school:String) > val fields = > > jsonf.map(data=>Record(data("type").toString,data("name").toString,data("score").toString,data("school").toString)) > val results = fields.transform((recrdd,tt) => { > recrdd.registerAsTable("table1") > val results = sql("select * from table1") > println(results) > results.foreach(println) > }) > //results.print() > ssc.start() > ssc.awaitTermination() > } > } > > but received the error > > [error] > > /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:38: > No TypeTag available for Record > [error] recrdd.registerAsTable("table1") > [error] ^ > [error] one error found > [error] (compile:compile) Compilation failed > [error] Total time: 17 s, completed Jul 16, 2014 3:11:53 AM > > > Please advice me on how to proceed > > Thanks, > -Srinivas. > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9868.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >