Hi, Ted,

Thanks for your reply. I noticed from the below link partitions.size will not 
work for checking empty RDD in streams. It seems that the problem can be solved 
in spark 1.3 which is no way to download at this time?

https://issues.apache.org/jira/browse/SPARK-5270
Best regards,

Cui Lin

From: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>
Date: Thursday, March 5, 2015 at 6:33 AM
To: Akhil Das <ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>>
Cc: Cui Lin <cui....@hds.com<mailto:cui....@hds.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: How to parse Json formatted Kafka message in spark streaming

Cui:
You can check messages.partitions.size to determine whether messages is an 
empty RDD.

Cheers

On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das 
<ak...@sigmoidanalytics.com<mailto:ak...@sigmoidanalytics.com>> wrote:
When you use KafkaUtils.createStream with StringDecoders, it will return String 
objects inside your messages stream. To access the elements from the json, you 
could do something like the following:


   val mapStream = messages.map(x=> {
      val mapper = new ObjectMapper() with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)

      mapper.readValue[Map[String,Any]](x).get("time")
    })



Thanks
Best Regards

On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin 
<cui....@hds.com<mailto:cui....@hds.com>> wrote:
Friends,

I'm trying to parse json formatted Kafka messages and then send back to 
cassandra.I have two problems:

  1.  I got the exception below. How to check an empty RDD?

Exception in thread "main" java.lang.UnsupportedOperationException: empty 
collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)


val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](…)

messages.foreachRDD { rdd =>
  val message:RDD[String] = rdd.map { y => y._2 }
  sqlContext.jsonRDD(message).registerTempTable("tempTable")
  sqlContext.sql("SELECT time,To FROM tempTable")
    .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
"msg"))
}

2. how to get all column names from json messages? I have hundreds of columns 
in the json formatted message.

Thanks for your help!




Best regards,

Cui Lin


Reply via email to