I have a Scala class that is set up to listen to a Kafka Topic. It then needs to look up a service endpoint for extra data, merge it with the original message from the queue and store it to a C* CQL table.
I've been able to get most of the code running but I'm more concerned over the approach: I start off with streaming from the Topic: val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel) Then process each RDD and call an Endpoint to collect extra data to merge whilst then storing it to Cassandra: messages.foreachRDD { rdd => log.info("======> Mapping data...") val message:RDD[(String, Message)] = rdd.map( y => { implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) val jVal = org.json4s.jackson.JsonMethods.parse(y._2) val message = jVal.extract[Message] (y._1, message) }) log.info("======> Data Mappped") message.foreachPartition(part => { val localLog:Logger = LogManager.getLogger("ActivityConsumer: RDD") localLog.info("======> Running partition...") part.foreach( message => { //calls an API to get related entity val activityStr = getActivity(message._2.soUid, message._2.cUid) localLog.info(activityStr) // Now let's extract the activity, see if it exists and start storing it to cassandra implicit val formats = org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints) val jVal = org.json4s.jackson.JsonMethods.parse(activityStr) val activityResults = jVal.extract[CASearchResults] val item1 = new MergeMessage(//collect fields from both entities//) // ****** This is my area of concern ****** val rdd1 = sc.parallelize(Seq(item1)); rdd1.saveToCassandra("data-migration", "messages", SomeColumns(//all the column defs//) }) }) Looking at the code it's basically going just store one message at a time - I've been used to storing a while RDD to C*. Am I on the right track or is there a better way to structure this? Anthony