Hi Cui,

What version of Spark are you using? There was a bug ticket that may be related 
to this, fixed in core/src/main/scala/org/apache/spark/rdd/RDD.scala that is 
merged into versions 1.3.0 and 1.2.1 . If you are using 1.1.1 that may be the 
reason but it’s a stretch https://issues.apache.org/jira/browse/SPARK-4968

Did you verify that you have data streaming from Kafka?

Helena
https://twitter.com/helenaedelson

On Mar 5, 2015, at 12:43 AM, Cui Lin <cui....@hds.com> wrote:

> Friends,
> 
> I'm trying to parse json formatted Kafka messages and then send back to 
> cassandra.I have two problems:
> I got the exception below. How to check an empty RDD?
> Exception in thread "main" java.lang.UnsupportedOperationException: empty 
> collection
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)
> 
> val messages = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](…)
> messages.foreachRDD { rdd =>
>   val message:RDD[String] = rdd.map { y => y._2 }
>   sqlContext.jsonRDD(message).registerTempTable("tempTable")
>   sqlContext.sql("SELECT time,To FROM tempTable")
>     .saveToCassandra(cassandra_keyspace, cassandra_table, SomeColumns("key", 
> "msg"))
> }
> 
> 2. how to get all column names from json messages? I have hundreds of columns 
> in the json formatted message. 
> 
> Thanks for your help!
> 
> 
> 
> 
> Best regards,
> 
> Cui Lin

Reply via email to