Hi Guys,

Good day!

I have question regarding how to consume a specific message belongs to that
consumer group?

Here's the scenario:

Publish message "A" to topic "X"
Consume by Consumer Group A the message "A" from topic "X"

Publish message "B" to topic "X"
Consume by Consumer Group B the message "B" from topic "X"

How can achieve this in scala? Because both consumers will consume the
message even if it assigned a groupId.

Here is my producer code:

def main(args: Array[String]): Unit = {
  val zookeeperUrl: String = "localhost:2182"
  val kafkaServerUrl: String = "localhost:9092,localhost:9093"
  val topic: String = "Topic_X"
  val groupId: String = "Consumer_Group_A"
  val eventType: String = "delete"
  val deleteRetention: Int = 5240000
  val producerConfig =
ProducerService.createProducerConfig(zookeeperUrl, kafkaServerUrl,
topic, groupId, deleteRetention.toString)
  val producer = new KafkaProducer[String, String](producerConfig)
  val producerPayload = ProducerPayload("{\"batch_id\":\"" +
UUID.randomUUID().toString()
    + "\", \"document_id\":\"" + UUID.randomUUID().toString()
    + "\", \"type\":\"" + eventType
    + "\"}", "", topic, groupId, deleteRetention)
  ProducerService.sendMessage(producer, zookeeperUrl, kafkaServerUrl,
producerPayload)
  logger.info("Done.")
}


Here is my consumer code:

def consumeMessage = Action { implicit rs =>
  val zookeeperUrl: String = "localhost:2182"
  val kafkaServerUrl: String = "localhost:9092,localhost:9093"
  val topic: String = "Topic_X"
  val groupId: String = "Consumer_Group_A"
  val config = ConsumerService.createConsumerConfig(zookeeperUrl,
kafkaServerUrl, groupId)
  val consumer = kafka.consumer.Consumer.create(config)
  val consumerMap = consumer.createMessageStreams(Map(topic -> 1))
  val streams = consumerMap.get(topic).get
  val it = streams(0).iterator()
  while (it.hasNext()) {
    val msg = new String(it.next().message())
    logger.info(s"Message successfully consumed from topic ${topic} => " + msg)
  }
  consumer.shutdown()
  logger.info("Done.")
  Ok
}




Your help is much appreciated. Thank you*Sincerely yours,*


*Rico Nodalo Lugod*
Senior Java / J2EE / SOA - Developer
Cebu City, Philippines 6000

Email: rnl2...@gmail.com

Reply via email to