what is the partition size? you need at least 2 partitions to distribute across two consumers
On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <lshr...@cisco.com> wrote: > Hi, > > > > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1 > producer and 2 consumers. > > I publish messages to the topic via the producer. > > Basically, I would likw that the messages will be published in a round > robin algorithm, > > so for that purpose I defined the consumers with the same group.id and > added config of partition.assignment.strategy to be > org.apache.kafka.clients.consumer.RoundRobinAssignor, > > but actually only 1 consumer receive all the messages. > > *My Producer Code: * > > *public* *class* DiscoveryKafkaProducer{ > > Producer<String, String> producer; > > > > *public* DiscoveryKafkaProducer(Properties configs) { > > producer = *new* KafkaProducer<String, String>(configs); > > } > > *public* *void* send(String topic, List<String> records) { > > *for*(String record: records){ > > producer.send(*new* ProducerRecord<String, String>(topic, > record)); > > } > > producer.flush(); > > } > > > > *public* *void* close() *throws* Exception { > > producer.close(); > > } > > } > > > > *My Consumer Code:* > > *public* *static* *void* main(String[] args) { > > String server = "lshraga-ubuntu-sp-nac:9092"; > > Properties consumerConfigs = *new* Properties(); > > consumerConfigs.put("bootstrap.servers", server); > > consumerConfigs.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > consumerConfigs.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > consumerConfigs.put("group.id", "discovery"); > > consumerConfigs.put("client.id", "discovery"); > > consumerConfigs.put("partition.assignment.strategy", > "org.apache.kafka.clients.consumer.RoundRobinAssignor"); > > List<String> *list* = *new* ArrayList<String>(); > > DiscoveryKafkaConsumer consumer1 = *new* > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName), > consumerConfigs); > > > > *try* { > > *while* (*true*) { > > System.*out*.println("Start to consume"); > > consumer1.poll(1000L); > > } > > } *catch* (InterruptedException e) { > > // *TODO* Auto-generated catch block > > e.printStackTrace(); > > } > > > > *public* *class* DiscoveryKafkaConsumer { > > Consumer<String, String> consumer; > > Integer id; > > *public* DiscoveryKafkaConsumer(List<String> topics, Properties configs) > { > > consumer = *new* KafkaConsumer<String, String>(configs); > > consumer.subscribe(topics); > > } > > > > *public* DiscoveryKafkaConsumer(*int* i, List<String> topics, > Properties configs) { > > consumer = *new* KafkaConsumer<String, String>(configs); > > consumer.subscribe(topics); > > *this*.id = i; > > } > > > > *public* *void* poll(*long* timeout) *throws* InterruptedException { > > ConsumerRecords<String,String> records = consumer.poll(timeout); > > System.*out*.println("Hey!Consumer #" + id + "got records:" + > records); > > Map<String, List<String>> results = *new* HashMap<String, > List<String>>(); > > records.forEach((cr) -> { > > System.*out*.println("cr.topic()=" + cr.topic()); > > List<String> list = results.get(cr.topic()); > > *if*(list == *null*) { > > list = *new* ArrayList<>(); > > results.put(cr.topic(), list); > > } > > list.add(cr.value()); > > System.*out*.println("list=" + list); > > }); > > } > > *public* *void* close() *throws* Exception { > > consumer.close(); > > } > > *What I need to add/condig in order to consume the messages in a Round > Robin ?* > > > > > > Thanks, > > > > > > [image: banner14] > > > > *Liel Shraga* > > ENGINEER.SOFTWARE ENGINEERING > > lshr...@cisco.com > > Tel: *+972 2 588 6394* > > *Cisco Systems, Inc.* > > 32 HaMelacha St., (HaSharon > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&source=g> > Bldg) P.O.Box 8735, I.Z.Sapir > SOUTH NETANYA > 42504 > Israel > cisco.com > > > > [image: http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think > before you print. > > This email may contain confidential and privileged material for the sole > use of the intended recipient. Any review, use, distribution or disclosure > by others is strictly prohibited. If you are not the intended recipient (or > authorized to receive for the recipient), please contact the sender by > reply email and delete all copies of this message. > > Please click here > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for > Company Registration Information. > > >