Hi, Helena,

I think your new version only fits to the json that has very limited columns. I 
couldn’t find MonthlyCommits, but I assume it only has small number of columns 
that are defined manually.
In my case, I have hundreds of column names so it is not feasible to define any 
class for these columns.

Is there any way to get column name instead of hard code “time” in this case?
 mapper.readValue[Map[String,Any]](x).get("time")

Best regards,

Cui Lin

From: Helena Edelson 
<helena.edel...@datastax.com<mailto:helena.edel...@datastax.com>>
Date: Thursday, March 5, 2015 at 7:02 AM
To: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>
Cc: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>>, 
Cui Lin <cui....@hds.com<mailto:cui....@hds.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming

Great point :) Cui, Here’s a cleaner way than I had before, w/out the use of 
spark sql for the mapping:


KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
  ssc, kafka.kafkaParams, Map("github" -> 5), StorageLevel.MEMORY_ONLY)
  .map{ case (k,v) => JsonParser.parse(v).extract[MonthlyCommits]}
  .saveToCassandra("githubstats","monthly_commits")

[datastax_logo.png]<http://www.datastax.com/>
HELENA EDELSON
Senior Software Engineer,  DSE Analytics

[linkedin.png]<https://www.linkedin.com/in/helenaedelson>[twitter.png]<https://twitter.com/helenaedelson>[https://lh3.googleusercontent.com/osrzRgrOxm-gW72LtTXbYGuQkFiBqViXEQBVw4v_cbl99iphx_LETFoz0Ew_bYfYSqIg53gwho5elasykBtuKj1we5KqatfDbvYYw3vnupBmLrs0kkL0t4l9u8JDQqzwLw]<https://github.com/helena>

On Mar 5, 2015, at 9:33 AM, Ted Yu 
<yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> wrote:

Cui:
You can check messages.partitions.size to determine whether messages is an 
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
<ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String 
objects inside your messages stream. To access the elements from the json, you 
could do something like the following:


   val mapStream = messages.map(x=> {
      val mapper = new ObjectMapper() with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)

      mapper.readValue[Map[String,Any]](x).get("time")
    })



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin 
<cui....@hds.com<mailto:cui....@hds.com>> wrote:
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
    .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
"msg"))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin



Reply via email to