Hi All,
I am using spark-1.0.0 to parse a json file and save to values to cassandra
using case class.
My code looks as follows:
case class LogLine(x1:Option[String],x2:
Option[String],x3:Option[List[String]],x4:
Option[String],x5:Option[String],x6:Option[String],x7:Option[String],x8:Option[String],x9:Option[String])
val data = test.map(line =>
{
parse(line)
}).map(json => {
// Extract the values
implicit lazy val formats = org.json4s.DefaultFormats
val x1 = (json \ "x1").extractOpt[String]
val x2 = (json \ "x2").extractOpt[String]
val x4=(json \ "x4").extractOpt[String]
val x5=(json \ "x5").extractOpt[String]
val x6=(json \ "x6").extractOpt[String]
val x7=(json \ "x7").extractOpt[String]
val x8=(json \ "x8").extractOpt[String]
val x3=(json \ "x3").extractOpt[List[String]]
val x9=(json \ "x9").extractOpt[String]
LogLine(x1,x2,x3,x4,x5,x6,x7,x8,x9)
})
data.saveToCassandra("test", "test_data", Seq("x1", "x2", "x3", "x4", "x5",
"x6", "x7", "x8", "x9"))
whereas the cassandra table schema is as follows:
CREATE TABLE test_data (
x1 varchar,
x2 varchar,
x4 varchar,
x5 varchar,
x6 varchar,
x7 varchar,
x8 varchar,
x3 list<text> ,
x9 varchar,
PRIMARY KEY (x1));
I am getting the following error on executing the saveToCassandra statement:
14/08/27 11:33:59 INFO SparkContext: Starting job: runJob at
package.scala:169
14/08/27 11:33:59 INFO DAGScheduler: Got job 5 (runJob at package.scala:169)
with 1 output partitions (allowLocal=false)
14/08/27 11:33:59 INFO DAGScheduler: Final stage: Stage 5(runJob at
package.scala:169)
14/08/27 11:33:59 INFO DAGScheduler: Parents of final stage: List()
14/08/27 11:33:59 INFO DAGScheduler: Missing parents: List()
14/08/27 11:33:59 INFO DAGScheduler: Submitting Stage 5 (MappedRDD[7] at map
at <console>:45), which has no missing parents
14/08/27 11:33:59 INFO DAGScheduler: Failed to run runJob at
package.scala:169
org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException: org.apache.spark.SparkConf
data.saveToCassandra("test", "test_data", Seq("x1", "x2", "x3", "x4", "x5",
"x6", "x7", "x8", "x9"))
Here the data field is org.apache.spark.rdd.RDD[LogLine] = MappedRDD[7] at
map at <console>:45
How can I convert this to Serializable, or is this a different problem?
Please advise.
Regards,
lmk
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-while-doing-rdd-saveToCassandra-tp12906.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]