Hi, Helena, I think your new version only fits to the json that has very limited columns. I couldn’t find MonthlyCommits, but I assume it only has small number of columns that are defined manually. In my case, I have hundreds of column names so it is not feasible to define any class for these columns.
Is there any way to get column name instead of hard code “time” in this case? mapper.readValue[Map[String,Any]](x).get("time") Best regards, Cui Lin From: Helena Edelson <helena.edel...@datastax.com<mailto:helena.edel...@datastax.com>> Date: Thursday, March 5, 2015 at 7:02 AM To: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> Cc: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>>, Cui Lin <cui....@hds.com<mailto:cui....@hds.com>>, "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: How to parse Json formatted Kafka message in spark streaming Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of spark sql for the mapping: KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY) .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]} .saveToCassandra("githubstats","monthly_commits") [datastax_logo.png]<http://www.datastax.com/> HELENA EDELSON Senior Software Engineer, DSE Analytics [linkedin.png]<https://www.linkedin.com/in/helenaedelson>[twitter.png]<https://twitter.com/helenaedelson>[https://lh3.googleusercontent.com/osrzRgrOxm-gW72LtTXbYGuQkFiBqViXEQBVw4v_cbl99iphx_LETFoz0Ew_bYfYSqIg53gwho5elasykBtuKj1we5KqatfDbvYYw3vnupBmLrs0kkL0t4l9u8JDQqzwLw]<https://github.com/helena> On Mar 5, 2015, at 9:33 AM, Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote: Cui: You can check messages.partitions.size to determine whether messages is an empty RDD. Cheers On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote: When you use KafkaUtils.createStream with StringDecoders, it will return String objects inside your messages stream. To access the elements from the json, you could do something like the following: val mapStream = messages.map(x=> { val mapper = new ObjectMapper() with ScalaObjectMapper mapper.registerModule(DefaultScalaModule) mapper.readValue[Map[String,Any]](x).get("time") }) Thanks Best Regards On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui....@hds.com<mailto:cui....@hds.com>> wrote: Friends, I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems: 1. I got the exception below. How to check an empty RDD? Exception in thread "main" java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](…) messages.foreachRDD { rdd => val message:RDD[String] = rdd.map { y => y._2 } sqlContext.jsonRDD(message).registerTempTable("tempTable") sqlContext.sql("SELECT time,To FROM tempTable") .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", "msg")) } 2. how to get all column names from json messages? I have hundreds of columns in the json formatted message. Thanks for your help! Best regards, Cui Lin