what is the partition size? you need at least 2 partitions to distribute
across two consumers


On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <lshr...@cisco.com>
wrote:

> Hi,
>
>
>
> I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1
> producer and 2 consumers.
>
> I publish messages to the topic via the producer.
>
> Basically, I would likw that the messages will be published in a round
> robin algorithm,
>
> so for that purpose I defined the consumers with the same group.id and
> added config of partition.assignment.strategy to be
> org.apache.kafka.clients.consumer.RoundRobinAssignor,
>
> but actually only 1 consumer receive all the messages.
>
> *My Producer Code: *
>
> *public* *class* DiscoveryKafkaProducer{
>
>    Producer<String, String> producer;
>
>
>
>    *public* DiscoveryKafkaProducer(Properties configs) {
>
>        producer = *new* KafkaProducer<String, String>(configs);
>
>    }
>
>    *public* *void* send(String topic, List<String> records) {
>
>          *for*(String record: records){
>
>                producer.send(*new* ProducerRecord<String, String>(topic,
> record));
>
>          }
>
>        producer.flush();
>
>    }
>
>
>
>    *public* *void* close() *throws* Exception {
>
>        producer.close();
>
>    }
>
> }
>
>
>
> *My Consumer Code:*
>
> *public* *static* *void* main(String[] args) {
>
>             String server = "lshraga-ubuntu-sp-nac:9092";
>
>             Properties consumerConfigs = *new* Properties();
>
>             consumerConfigs.put("bootstrap.servers", server);
>
>             consumerConfigs.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>             consumerConfigs.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
>             consumerConfigs.put("group.id", "discovery");
>
>             consumerConfigs.put("client.id", "discovery");
>
>             consumerConfigs.put("partition.assignment.strategy",
> "org.apache.kafka.clients.consumer.RoundRobinAssignor");
>
>             List<String> *list* = *new* ArrayList<String>();
>
>             DiscoveryKafkaConsumer consumer1 = *new*
> DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> consumerConfigs);
>
>
>
>             *try* {
>
>                   *while* (*true*) {
>
>                         System.*out*.println("Start to consume");
>
>                       consumer1.poll(1000L);
>
>                   }
>
>             } *catch* (InterruptedException e) {
>
>                   // *TODO* Auto-generated catch block
>
>                   e.printStackTrace();
>
>             }
>
>
>
> *public* *class* DiscoveryKafkaConsumer {
>
>    Consumer<String, String> consumer;
>
>    Integer id;
>
>    *public* DiscoveryKafkaConsumer(List<String> topics, Properties configs)
> {
>
>        consumer = *new* KafkaConsumer<String, String>(configs);
>
>        consumer.subscribe(topics);
>
>    }
>
>
>
>    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics,
> Properties configs) {
>
>          consumer = *new* KafkaConsumer<String, String>(configs);
>
>        consumer.subscribe(topics);
>
>        *this*.id = i;
>
> }
>
>
>
>    *public* *void* poll(*long* timeout) *throws* InterruptedException {
>
>        ConsumerRecords<String,String> records = consumer.poll(timeout);
>
>        System.*out*.println("Hey!Consumer #" + id + "got records:" +
> records);
>
>        Map<String, List<String>> results = *new* HashMap<String,
> List<String>>();
>
>        records.forEach((cr) -> {
>
>          System.*out*.println("cr.topic()=" + cr.topic());
>
>            List<String> list = results.get(cr.topic());
>
>            *if*(list == *null*) {
>
>                list = *new* ArrayList<>();
>
>                results.put(cr.topic(), list);
>
>                }
>
>                   list.add(cr.value());
>
>                   System.*out*.println("list=" + list);
>
>            });
>
>    }
>
>    *public* *void* close() *throws* Exception {
>
>        consumer.close();
>
>    }
>
> *What I need to add/condig in order to consume the messages in a Round
> Robin ?*
>
>
>
>
>
> Thanks,
>
>
>
>
>
> [image: banner14]
>
>
>
> *Liel Shraga*
>
> ENGINEER.SOFTWARE ENGINEERING
>
> lshr...@cisco.com
>
> Tel: *+972 2 588 6394*
>
> *Cisco Systems, Inc.*
>
> 32 HaMelacha St., (HaSharon
> <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&source=g>
> Bldg) P.O.Box 8735, I.Z.Sapir
> SOUTH NETANYA
> 42504
> Israel
> cisco.com
>
>
>
> [image: http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think
> before you print.
>
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
>
> Please click here
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for
> Company Registration Information.
>
>
>

Reply via email to