See following thread for 1.3.0 release: http://search-hadoop.com/m/JW1q5hV8c4
Looks like the release is around the corner. On Thu, Mar 5, 2015 at 3:26 PM, Cui Lin <cui....@hds.com> wrote: > Hi, Ted, > > Thanks for your reply. I noticed from the below link partitions.size > will not work for checking empty RDD in streams. It seems that the problem > can be solved in spark 1.3 which is no way to download at this time? > > https://issues.apache.org/jira/browse/SPARK-5270 > Best regards, > > Cui Lin > > From: Ted Yu <yuzhih...@gmail.com> > Date: Thursday, March 5, 2015 at 6:33 AM > To: Akhil Das <ak...@sigmoidanalytics.com> > Cc: Cui Lin <cui....@hds.com>, "user@spark.apache.org" < > user@spark.apache.org> > Subject: Re: How to parse Json formatted Kafka message in spark streaming > > Cui: > You can check messages.partitions.size to determine whether messages is > an empty RDD. > > Cheers > > On Thu, Mar 5, 2015 at 12:52 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> When you use KafkaUtils.createStream with StringDecoders, it will >> return String objects inside your messages stream. To access the elements >> from the json, you could do something like the following: >> >> >> val mapStream = messages.map(x=> { >> val mapper = new ObjectMapper() with ScalaObjectMapper >> mapper.registerModule(DefaultScalaModule) >> >> mapper.readValue[Map[String,Any]](x)*.get("time")* >> }) >> >> >> >> Thanks >> Best Regards >> >> On Thu, Mar 5, 2015 at 11:13 AM, Cui Lin <cui....@hds.com> wrote: >> >>> Friends, >>> >>> I'm trying to parse json formatted Kafka messages and then send back >>> to cassandra.I have two problems: >>> >>> 1. I got the exception below. How to check an empty RDD? >>> >>> Exception in thread "main" java.lang.UnsupportedOperationException: >>> empty collection >>> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) >>> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869) >>> at scala.Option.getOrElse(Option.scala:120) >>> at org.apache.spark.rdd.RDD.reduce(RDD.scala:869) >>> at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57) >>> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232) >>> at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204) >>> >>> val messages = KafkaUtils.createStream[String, String, StringDecoder, >>> StringDecoder](…) >>> >>> messages.foreachRDD { rdd => >>> val message:RDD[String] = rdd.map { y => y._2 } >>> sqlContext.jsonRDD(message).registerTempTable("tempTable") >>> sqlContext.sql("SELECT time,To FROM tempTable") >>> .saveToCassandra(cassandra_keyspace, cassandra_table, >>> SomeColumns("key", "msg")) >>> } >>> >>> >>> 2. how to get all column names from json messages? I have hundreds of >>> columns in the json formatted message. >>> >>> Thanks for your help! >>> >>> >>> >>> >>> Best regards, >>> >>> Cui Lin >>> >> >> >