Rico,

Every consumer group will see all messages for the topics they are
subscribed to. If you want to filter these messages by consumer group,
you'd need to perform that filtering yourself after the messages are
returned by the consumer. To do so, you'd need to include enough
information in the messages to perform this filtering (e.g. you might
include metadata indicating which consumer group the message is intended
for, although if you are doing that it might be better to simply use
different topics for the two sets of messages).

-Ewen

On Fri, Dec 16, 2016 at 12:28 AM, Rico Lugod <rnl2...@gmail.com> wrote:

> Hi Guys,
>
> Good day!
>
> I have question regarding how to consume a specific message belongs to that
> consumer group?
>
> Here's the scenario:
>
> Publish message "A" to topic "X"
> Consume by Consumer Group A the message "A" from topic "X"
>
> Publish message "B" to topic "X"
> Consume by Consumer Group B the message "B" from topic "X"
>
> How can achieve this in scala? Because both consumers will consume the
> message even if it assigned a groupId.
>
> Here is my producer code:
>
> def main(args: Array[String]): Unit = {
>   val zookeeperUrl: String = "localhost:2182"
>   val kafkaServerUrl: String = "localhost:9092,localhost:9093"
>   val topic: String = "Topic_X"
>   val groupId: String = "Consumer_Group_A"
>   val eventType: String = "delete"
>   val deleteRetention: Int = 5240000
>   val producerConfig =
> ProducerService.createProducerConfig(zookeeperUrl, kafkaServerUrl,
> topic, groupId, deleteRetention.toString)
>   val producer = new KafkaProducer[String, String](producerConfig)
>   val producerPayload = ProducerPayload("{\"batch_id\":\"" +
> UUID.randomUUID().toString()
>     + "\", \"document_id\":\"" + UUID.randomUUID().toString()
>     + "\", \"type\":\"" + eventType
>     + "\"}", "", topic, groupId, deleteRetention)
>   ProducerService.sendMessage(producer, zookeeperUrl, kafkaServerUrl,
> producerPayload)
>   logger.info("Done.")
> }
>
>
> Here is my consumer code:
>
> def consumeMessage = Action { implicit rs =>
>   val zookeeperUrl: String = "localhost:2182"
>   val kafkaServerUrl: String = "localhost:9092,localhost:9093"
>   val topic: String = "Topic_X"
>   val groupId: String = "Consumer_Group_A"
>   val config = ConsumerService.createConsumerConfig(zookeeperUrl,
> kafkaServerUrl, groupId)
>   val consumer = kafka.consumer.Consumer.create(config)
>   val consumerMap = consumer.createMessageStreams(Map(topic -> 1))
>   val streams = consumerMap.get(topic).get
>   val it = streams(0).iterator()
>   while (it.hasNext()) {
>     val msg = new String(it.next().message())
>     logger.info(s"Message successfully consumed from topic ${topic} => "
> + msg)
>   }
>   consumer.shutdown()
>   logger.info("Done.")
>   Ok
> }
>
>
>
>
> Your help is much appreciated. Thank you*Sincerely yours,*
>
>
> *Rico Nodalo Lugod*
> Senior Java / J2EE / SOA - Developer
> Cebu City, Philippines 6000
>
> Email: rnl2...@gmail.com
>

Reply via email to