When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following:
val mapStream = messages.map(x=> { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x)*.get("time")* }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui....@hds.com> wrote: > Friends, > > I'm trying to parse json formatted Kafka messages and then send back to > cassandra.I have two problems: > > 1. I got the exception below. How to check an empty RDD? > > Exception in thread "main" java.lang.UnsupportedOperationException: > empty collection > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) > at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) > at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) > at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) > at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) > > val messages = KafkaUtils.createStream[String, String, StringDecoder, > StringDecoder](…) > > messages.foreachRDD { rdd => > val message:RDD[String] = rdd.map { y => y._2 } > sqlContext.jsonRDD(message).registerTempTable("tempTable") > sqlContext.sql("SELECT time,To FROM tempTable") > .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", > "msg")) > } > > > 2. how to get all column names from json messages? I have hundreds of > columns in the json formatted message. > > Thanks for your help! > > > > > Best regards, > > Cui Lin >