Zeppelin, an excellent tool. I am trying to implement a streaming
application. I get an error while deploying my application. See my code
below
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.sql.SQLContext
val sparkConf = new
SparkConf().setAppName("PEPA").setMaster("local[*]").set("spark.driver.allowMultipleContexts",
"true")
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = Map("incoming"->1)
val record = KafkaUtils.createStream(ssc, "localhost", "1",
topicMap).map(_._2)
record.print()
case class
CELL_KPIS(ECELL_Name:String,CGI:String,Number_of_Times_Interf:Double,TAOF:Double,PHL:Double,NPCCHL:Double,LRSRP:Double,NC:Double)
val data =
record.map(s=>s.split(",")).filter(s=>s(0)!="\"ECELL_Name\"").map(
s=>CELL_KPIS(s(0), s(1), s(2).toDouble, s(3).toDouble,
s(5).toDouble,s(6).toDouble, s(7).toDouble, s(8).toDouble)
)
data.foreachRDD {accessLogs =>
import sqlContext.implicits._
accessLogs.registerTempTable("RAS")
}
ssc.start()
ssc.awaitTermination()
And I get error
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf import
org.apache.spark.streaming.StreamingContext import
org.apache.spark.streaming.Seconds import org.apache.spark.sql.SQLContext
sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2e5779a
import org.apache.spark.streaming.kafka._ ssc:
org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@48621ee1 topicMap:
scala.collection.immutable.Map[String,Int] = Map(incoming -> 1) record:
org.apache.spark.streaming.dstream.DStream[String] =
org.apache.spark.streaming.dstream.MappedDStream@6290e75e defined class
CELL_KPIS data: org.apache.spark.streaming.dstream.DStream[CELL_KPIS] =
org.apache.spark.streaming.dstream.MappedDStream@4bda38c3
<console>:55: error: value registerTempTable is not a member of
org.apache.spark.rdd.RDD[CELL_KPIS] accessLogs.registerTempTable("RAS")
*My configuration for Zeppelin*
export MASTER=spark://localhost:7077
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_05
export ZEPPELIN_PORT=9090
export ZEPPELIN_SPARK_CONCURRENTSQL=false
export ZEPPELIN_SPARK_USEHIVECONTEXT=false
#'export MASTER=local[*]
export SPARK_HOME=/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4
*Interpreter configuration for spark *
"2AW247KM7": { "id": "2AW247KM7", "name": "spark", "group": "spark",
"properties": { "spark.cores.max": "", "spark.yarn.jar": "", "master":
"local[*]", "zeppelin.spark.maxResult": "1000", "zeppelin.dep.localrepo":
"local-repo", "spark.app.name": "APP3", "spark.executor.memory": "5G",
"zeppelin.spark.useHiveContext": "false",
"spark.driver.allowMultipleContexts": "true", "args": "", "spark.home":
"/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4",
"zeppelin.spark.concurrentSQL": "true", "zeppelin.pyspark.python": "python"
}, "interpreterGroup": [ { "class":
"org.apache.zeppelin.spark.SparkInterpreter", "name": "spark" }, { "class":
"org.apache.zeppelin.spark.PySparkInterpreter", "name": "pyspark" }, {
"class": "org.apache.zeppelin.spark.SparkSqlInterpreter", "name": "sql" },
{ "class": "org.apache.zeppelin.spark.DepInterpreter", "name": "dep" } ],
"option": { "remote": true } }
Is there any problem in my code or setup ?
Any help very much appreciated.