what is the partition size? you need at least 2 partitions to distribute
across two consumers

On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <lshr...@cisco.com>

> Hi,
> I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1
> producer and 2 consumers.
> I publish messages to the topic via the producer.
> Basically, I would likw that the messages will be published in a round
> robin algorithm,
> so for that purpose I defined the consumers with the same group.id and
> added config of partition.assignment.strategy to be
> org.apache.kafka.clients.consumer.RoundRobinAssignor,
> but actually only 1 consumer receive all the messages.
> *My Producer Code: *
> *public* *class* DiscoveryKafkaProducer{
>    Producer<String, String> producer;
>    *public* DiscoveryKafkaProducer(Properties configs) {
>        producer = *new* KafkaProducer<String, String>(configs);
>    }
>    *public* *void* send(String topic, List<String> records) {
>          *for*(String record: records){
>                producer.send(*new* ProducerRecord<String, String>(topic,
> record));
>          }
>        producer.flush();
>    }
>    *public* *void* close() *throws* Exception {
>        producer.close();
>    }
> }
> *My Consumer Code:*
> *public* *static* *void* main(String[] args) {
>             String server = "lshraga-ubuntu-sp-nac:9092";
>             Properties consumerConfigs = *new* Properties();
>             consumerConfigs.put("bootstrap.servers", server);
>             consumerConfigs.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>             consumerConfigs.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>             consumerConfigs.put("group.id", "discovery");
>             consumerConfigs.put("client.id", "discovery");
>             consumerConfigs.put("partition.assignment.strategy",
> "org.apache.kafka.clients.consumer.RoundRobinAssignor");
>             List<String> *list* = *new* ArrayList<String>();
>             DiscoveryKafkaConsumer consumer1 = *new*
> DiscoveryKafkaConsumer(Collections.*singletonList*(topicName),
> consumerConfigs);
>             *try* {
>                   *while* (*true*) {
>                         System.*out*.println("Start to consume");
>                       consumer1.poll(1000L);
>                   }
>             } *catch* (InterruptedException e) {
>                   // *TODO* Auto-generated catch block
>                   e.printStackTrace();
>             }
> *public* *class* DiscoveryKafkaConsumer {
>    Consumer<String, String> consumer;
>    Integer id;
>    *public* DiscoveryKafkaConsumer(List<String> topics, Properties configs)
> {
>        consumer = *new* KafkaConsumer<String, String>(configs);
>        consumer.subscribe(topics);
>    }
>    *public* DiscoveryKafkaConsumer(*int* i, List<String> topics,
> Properties configs) {
>          consumer = *new* KafkaConsumer<String, String>(configs);
>        consumer.subscribe(topics);
>        *this*.id = i;
> }
>    *public* *void* poll(*long* timeout) *throws* InterruptedException {
>        ConsumerRecords<String,String> records = consumer.poll(timeout);
>        System.*out*.println("Hey!Consumer #" + id + "got records:" +
> records);
>        Map<String, List<String>> results = *new* HashMap<String,
> List<String>>();
>        records.forEach((cr) -> {
>          System.*out*.println("cr.topic()=" + cr.topic());
>            List<String> list = results.get(cr.topic());
>            *if*(list == *null*) {
>                list = *new* ArrayList<>();
>                results.put(cr.topic(), list);
>                }
>                   list.add(cr.value());
>                   System.*out*.println("list=" + list);
>            });
>    }
>    *public* *void* close() *throws* Exception {
>        consumer.close();
>    }
> *What I need to add/condig in order to consume the messages in a Round
> Robin ?*
> Thanks,
