Hi, I have the following Spark streaming application which stream from a Kafka topic, do some processing and publish the result to another topic. In between I am reading records from a Cassandra CF.
The issue is when the application is running, if a new row is inserted into Cassandra CF, that new row will not be available to the application until I stop and restart the application. def main(args: scala.Array[String]) { .................. val ssc = new StreamingContext(conf, Milliseconds(2000)) val searches = KafkaUtils.createStream(ssc, zkQuorum,group, topicMap).map(line => parse(line._2).extract[Search]) ..................... val cqlOffers = ssc.sc.cassandraTable[Offer]("mydb", "offer").cache; val listingSearch = searches.map(search => Tuple2(search.id, search)) val listingOffers = cqlOffers.map(offer => Tuple2(offer.listingId, offer)) val result = listingSearch.transform(Rdd => { Rdd.cogroup(listingOffers,10)}) .filter(lo => (!lo._2._1.isEmpty)) .map(slOff => (slOff._2, ListingOffer(slOff._1, slOff._3))).groupByKey(10) .map(res => compact(Extraction.decompose(res))); cqlOffers.unpersist(true) result.foreachRDD( res =>{ val resStr = res.collect; if(!resStr.isEmpty){ for (i <- 0 to resStr.length-1) { producer.send(new KeyedMessage[String, String](topics_topush, ""+ resStr(i))) } } }) ssc.start() ssc.awaitTermination() } I had tried to cache cqlOffers and unpersist it once the result is calculated without success. Is there anything that I am missing. I am not sure whether this is due to the optimization from Spark or some issue within spark-cassandra-connector. In the meantime I have also posted this issue on spark-cassandra-connector (https://github.com/datastax/spark-cassandra-connector/issues/111) Thanks, Praful -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-CassandraRDD-not-getting-refreshed-with-new-rows-in-column-family-tp10935.html Sent from the Apache Spark User List mailing list archive at Nabble.com.