Hi, 

I have the following Spark streaming application which stream from a Kafka
topic, do some processing and publish the result to another topic. In
between I am reading records from a Cassandra CF. 

The issue is when the application is running, if a new row is inserted into
Cassandra CF, that new row will not be available to the application until I
stop and restart the application. 

def main(args: scala.Array[String]) { 
  .................. 
  val ssc =  new StreamingContext(conf, Milliseconds(2000)) 
  val searches = KafkaUtils.createStream(ssc, zkQuorum,group,
topicMap).map(line => parse(line._2).extract[Search]) 
  ..................... 

  val cqlOffers = ssc.sc.cassandraTable[Offer]("mydb", "offer").cache; 

  val listingSearch =  searches.map(search => Tuple2(search.id, search)) 
  val listingOffers =  cqlOffers.map(offer => Tuple2(offer.listingId,
offer)) 

   val result = listingSearch.transform(Rdd => {
Rdd.cogroup(listingOffers,10)}) 
                .filter(lo => (!lo._2._1.isEmpty)) 
                .map(slOff => (slOff._2, ListingOffer(slOff._1,
slOff._3))).groupByKey(10) 
                .map(res => compact(Extraction.decompose(res))); 

  cqlOffers.unpersist(true) 

  result.foreachRDD( res =>{ 
    val resStr = res.collect; 
    if(!resStr.isEmpty){           
        for (i <- 0 to resStr.length-1) { 
            producer.send(new KeyedMessage[String, String](topics_topush,
""+ resStr(i))) 
        } 
    } 
  }) 
    ssc.start() 
    ssc.awaitTermination() 
} 
I had tried to cache cqlOffers and unpersist it once the result is
calculated without success. Is there anything that I am missing. 

I am not sure whether this is due to the optimization from Spark or some
issue within spark-cassandra-connector. In the meantime I have also posted
this issue on spark-cassandra-connector
(https://github.com/datastax/spark-cassandra-connector/issues/111) 

Thanks, 
Praful



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-CassandraRDD-not-getting-refreshed-with-new-rows-in-column-family-tp10935.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to