Hi Guys, Good day!
I have question regarding how to consume a specific message belongs to that consumer group? Here's the scenario: Publish message "A" to topic "X" Consume by Consumer Group A the message "A" from topic "X" Publish message "B" to topic "X" Consume by Consumer Group B the message "B" from topic "X" How can achieve this in scala? Because both consumers will consume the message even if it assigned a groupId. Here is my producer code: def main(args: Array[String]): Unit = { val zookeeperUrl: String = "localhost:2182" val kafkaServerUrl: String = "localhost:9092,localhost:9093" val topic: String = "Topic_X" val groupId: String = "Consumer_Group_A" val eventType: String = "delete" val deleteRetention: Int = 5240000 val producerConfig = ProducerService.createProducerConfig(zookeeperUrl, kafkaServerUrl, topic, groupId, deleteRetention.toString) val producer = new KafkaProducer[String, String](producerConfig) val producerPayload = ProducerPayload("{\"batch_id\":\"" + UUID.randomUUID().toString() + "\", \"document_id\":\"" + UUID.randomUUID().toString() + "\", \"type\":\"" + eventType + "\"}", "", topic, groupId, deleteRetention) ProducerService.sendMessage(producer, zookeeperUrl, kafkaServerUrl, producerPayload) logger.info("Done.") } Here is my consumer code: def consumeMessage = Action { implicit rs => val zookeeperUrl: String = "localhost:2182" val kafkaServerUrl: String = "localhost:9092,localhost:9093" val topic: String = "Topic_X" val groupId: String = "Consumer_Group_A" val config = ConsumerService.createConsumerConfig(zookeeperUrl, kafkaServerUrl, groupId) val consumer = kafka.consumer.Consumer.create(config) val consumerMap = consumer.createMessageStreams(Map(topic -> 1)) val streams = consumerMap.get(topic).get val it = streams(0).iterator() while (it.hasNext()) { val msg = new String(it.next().message()) logger.info(s"Message successfully consumed from topic ${topic} => " + msg) } consumer.shutdown() logger.info("Done.") Ok } Your help is much appreciated. Thank you*Sincerely yours,* *Rico Nodalo Lugod* Senior Java / J2EE / SOA - Developer Cebu City, Philippines 6000 Email: rnl2...@gmail.com