Hi, Ted, Thanks for your reply. I noticed from the below link partitions.size will not work for checking empty RDD in streams. It seems that the problem can be solved in spark 1.3 which is no way to download at this time?
https://issues.apache.org/jira/browse/SPARK-5270 Best regards, Cui Lin From: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> Date: Thursday, March 5, 2015 at 6:33 AM To: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> Cc: Cui Lin <cui....@hds.com<mailto:cui....@hds.com>>, "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: How to parse Json formatted Kafka message in spark streaming Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x=> { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x).get("time") }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui....@hds.com<mailto:cui....@hds.com>> wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread "main" java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd => val message:RDD[String] = rdd.map { y => y._2 } sqlContext.jsonRDD(message).registerTempTable("tempTable") sqlContext.sql("SELECT time,To FROM tempTable") .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg")) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin