I do not see that your importing the following: import org.apache.spark.sql._
Which I believe is where you will find the DataFrame.toDF function. HTH. -Todd On Mon, Sep 7, 2015 at 5:49 PM, Sajeevan Achuthan < [email protected]> wrote: > Hi Moon, > Thanks for the reply, I tried that option too. Unfortunately, I tried > that option too and I got the error > data: org.apache.spark.streaming.dstream.DStream[CELL_KPIS] = > org.apache.spark.streaming.dstream.MappedDStream@5f3ea8bb <console>:49: > error: value toDF is not a member of org.apache.spark.rdd.RDD[CELL_KPIS] > accessLogs.toDF.registerTempTable("RAS") ^ > Any idea? > > On 7 September 2015 at 17:30, moon soo Lee <[email protected]> wrote: > >> Hi, >> >> I think you will need to convert RDD to data frame using .toDF(), like >> accessLogs.toDF.registerTempTable("RAS") >> >> Thanks, >> moon >> >> On Mon, Sep 7, 2015 at 3:34 AM Sajeevan Achuthan < >> [email protected]> wrote: >> >>> Zeppelin, an excellent tool. I am trying to implement a streaming >>> application. I get an error while deploying my application. See my code >>> below >>> >>> >>> import org.apache.spark.SparkContext >>> import org.apache.spark.SparkContext._ >>> import org.apache.spark.SparkConf >>> import org.apache.spark.streaming.StreamingContext >>> import org.apache.spark.streaming.Seconds >>> import org.apache.spark.sql.SQLContext >>> val sparkConf = new >>> SparkConf().setAppName("PEPA").setMaster("local[*]").set("spark.driver.allowMultipleContexts", >>> "true") >>> >>> import org.apache.spark.streaming.kafka._ >>> val ssc = new StreamingContext(sparkConf, Seconds(2)) >>> >>> ssc.checkpoint("checkpoint") >>> val topicMap = Map("incoming"->1) >>> >>> val record = KafkaUtils.createStream(ssc, "localhost", "1", >>> topicMap).map(_._2) >>> record.print() >>> case class >>> CELL_KPIS(ECELL_Name:String,CGI:String,Number_of_Times_Interf:Double,TAOF:Double,PHL:Double,NPCCHL:Double,LRSRP:Double,NC:Double) >>> val data = >>> record.map(s=>s.split(",")).filter(s=>s(0)!="\"ECELL_Name\"").map( >>> s=>CELL_KPIS(s(0), s(1), s(2).toDouble, s(3).toDouble, >>> s(5).toDouble,s(6).toDouble, s(7).toDouble, s(8).toDouble) >>> ) >>> data.foreachRDD {accessLogs => >>> import sqlContext.implicits._ >>> accessLogs.registerTempTable("RAS") >>> >>> } >>> ssc.start() >>> ssc.awaitTermination() >>> >>> And I get error >>> import org.apache.spark.SparkContext import >>> org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import >>> org.apache.spark.streaming.StreamingContext import >>> org.apache.spark.streaming.Seconds import org.apache.spark.sql.SQLContext >>> sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2e5779a >>> import org.apache.spark.streaming.kafka._ ssc: >>> org.apache.spark.streaming.StreamingContext = >>> org.apache.spark.streaming.StreamingContext@48621ee1 topicMap: >>> scala.collection.immutable.Map[String,Int] = Map(incoming -> 1) record: >>> org.apache.spark.streaming.dstream.DStream[String] = >>> org.apache.spark.streaming.dstream.MappedDStream@6290e75e defined class >>> CELL_KPIS data: org.apache.spark.streaming.dstream.DStream[CELL_KPIS] = >>> org.apache.spark.streaming.dstream.MappedDStream@4bda38c3 >>> >>> <console>:55: error: value registerTempTable is not a member of >>> org.apache.spark.rdd.RDD[CELL_KPIS] accessLogs.registerTempTable("RAS") >>> >>> *My configuration for Zeppelin* >>> >>> >>> export MASTER=spark://localhost:7077 >>> export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_05 >>> export ZEPPELIN_PORT=9090 >>> export ZEPPELIN_SPARK_CONCURRENTSQL=false >>> export ZEPPELIN_SPARK_USEHIVECONTEXT=false >>> #'export MASTER=local[*] >>> export SPARK_HOME=/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4 >>> >>> *Interpreter configuration for spark * >>> >>> "2AW247KM7": { "id": "2AW247KM7", "name": "spark", "group": "spark", >>> "properties": { "spark.cores.max": "", "spark.yarn.jar": "", "master": >>> "local[*]", "zeppelin.spark.maxResult": "1000", "zeppelin.dep.localrepo": >>> "local-repo", "spark.app.name": "APP3", "spark.executor.memory": "5G", >>> "zeppelin.spark.useHiveContext": "false", >>> "spark.driver.allowMultipleContexts": "true", "args": "", "spark.home": >>> "/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4", >>> "zeppelin.spark.concurrentSQL": "true", "zeppelin.pyspark.python": "python" >>> }, "interpreterGroup": [ { "class": >>> "org.apache.zeppelin.spark.SparkInterpreter", "name": "spark" }, { "class": >>> "org.apache.zeppelin.spark.PySparkInterpreter", "name": "pyspark" }, { >>> "class": "org.apache.zeppelin.spark.SparkSqlInterpreter", "name": "sql" }, >>> { "class": "org.apache.zeppelin.spark.DepInterpreter", "name": "dep" } ], >>> "option": { "remote": true } } >>> Is there any problem in my code or setup ? >>> Any help very much appreciated. >>> >> >
