Can you try defining the case class outside the main function. In fact
outside the object?

TD



On Tue, Jul 15, 2014 at 8:20 PM, srinivas <kusamsrini...@gmail.com> wrote:

> Hi TD,
>
> I uncomment import sqlContext._ and tried to compile the code
>
> import java.util.Properties
> import kafka.producer._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.SparkConf
> import scala.util.parsing.json.JSON
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> object KafkaWordCount {
>   def main(args: Array[String]) {
>     if (args.length < 4) {
>       System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics>
> <numThreads>")
>       System.exit(1)
>     }
>
>    //StreamingExamples.setStreamingLogLevels()
>
>     val Array(zkQuorum, group, topics, numThreads) = args
>     val sparkConf = new SparkConf().setAppName("KafkaWordCount")
>     val ssc = new StreamingContext(sparkConf, Seconds(10))
>     val sql = new SparkContext(sparkConf)
>     val sqlContext = new SQLContext(sql)
>     val timer = Time(10000)
>    // ssc.checkpoint("checkpoint")
>
> import sqlContext._
>     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>      val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicpMap).map(_._2)
>      val jsonf =
>
> lines.map(JSON.parseFull(_)).map(_.get.asInstanceOf[scala.collection.immutable.Map[String,Any]])
>  case class Record(ID:String,name:String,score:String,school:String)
> val fields =
>
> jsonf.map(data=>Record(data("type").toString,data("name").toString,data("score").toString,data("school").toString))
> val results = fields.transform((recrdd,tt) => {
>  recrdd.registerAsTable("table1")
>  val results = sql("select * from table1")
>  println(results)
>  results.foreach(println)
> })
> //results.print()
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> but received the error
>
> [error]
>
> /home/ubuntu/spark-1.0.0/external/jsonfile2/src/main/scala/jsonfile.scala:38:
> No TypeTag available for Record
> [error]  recrdd.registerAsTable("table1")
> [error]  ^
> [error] one error found
> [error] (compile:compile) Compilation failed
> [error] Total time: 17 s, completed Jul 16, 2014 3:11:53 AM
>
>
> Please advice me on how to proceed
>
> Thanks,
> -Srinivas.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Json-file-groupby-function-tp9618p9868.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to