When you use KafkaUtils.createStream with StringDecoders, it will return
String objects inside your messages stream. To access the elements from the
json, you could do something like the following:


   val mapStream = messages.map(x=> {
      val mapper = new ObjectMapper() with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)

      mapper.readValue[Map[String,Any]](x)*.get("time")*
    })



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui....@hds.com> wrote:

>   Friends,
>
>   I'm trying to parse json formatted Kafka messages and then send back to
> cassandra.I have two problems:
>
>    1. I got the exception below. How to check an empty RDD?
>
>  Exception in thread "main" java.lang.UnsupportedOperationException:
> empty collection
>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>  at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
>  at scala.Option.getOrElse(Option.scala:120)
>  at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
>  at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
>  at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
>
>  val messages = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](…)
>
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
> "msg"))
> }
>
>
>  2. how to get all column names from json messages? I have hundreds of
> columns in the json formatted message.
>
>  Thanks for your help!
>
>
>
>
>  Best regards,
>
>  Cui Lin
>

Reply via email to