I have a Scala class that is set up to listen to a Kafka Topic. It then
needs to look up a service endpoint for extra data, merge it with the
original message from the queue and store it to a C* CQL table.

I've been able to get most of the code running but I'm more concerned over
the approach:


I start off with streaming from the Topic:

val messages = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics, storageLevel)

Then process each RDD and call an Endpoint to collect extra data to merge
whilst then storing it to Cassandra:

    messages.foreachRDD { rdd =>

      log.info("======> Mapping data...")

      val message:RDD[(String, Message)] = rdd.map( y => {
        implicit val formats =
org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
        val jVal = org.json4s.jackson.JsonMethods.parse(y._2)
        val message = jVal.extract[Message]
        (y._1, message)
      })

      log.info("======> Data Mappped")

      message.foreachPartition(part => {
        val localLog:Logger = LogManager.getLogger("ActivityConsumer: RDD")
        localLog.info("======> Running partition...")

        part.foreach( message => {
          //calls an API to get related entity
          val activityStr = getActivity(message._2.soUid, message._2.cUid)
          localLog.info(activityStr)

          // Now let's extract the activity, see if it exists and start
storing it to cassandra
          implicit val formats =
org.json4s.jackson.Serialization.formats(org.json4s.NoTypeHints)
          val jVal = org.json4s.jackson.JsonMethods.parse(activityStr)
          val activityResults = jVal.extract[CASearchResults]

          val item1 = new MergeMessage(//collect fields from both
entities//)

          // ****** This is my area of concern ******
          val rdd1 = sc.parallelize(Seq(item1));
          rdd1.saveToCassandra("data-migration", "messages",
SomeColumns(//all the column defs//)
        })
      })

Looking at the code it's basically going just store one message at a time -
I've been used to storing a while RDD to C*. Am I on the right track or is
there a better way to structure this?

Anthony

Reply via email to