Can you provide your docker file, or compose file? On Wed, Sep 13, 2017 at 1:55 AM, Liel Shraga (lshraga) <lshr...@cisco.com> wrote:
> Hi, > > I didn’t define the partition size. How can I do it with kafka-clients API? > > Thanks, > > > > > > Liel Shraga > ENGINEER.SOFTWARE ENGINEERING > lshr...@cisco.com > Tel: +972 2 588 6394 > Cisco Systems, Inc. > 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir > SOUTH NETANYA > 42504 > Israel > cisco.com > > > Think before you print. > This email may contain confidential and privileged material for the sole > use of the intended recipient. Any review, use, distribution or disclosure > by others is strictly prohibited. If you are not the intended recipient (or > authorized to receive for the recipient), please contact the sender by > reply email and delete all copies of this message. > Please click here for Company Registration Information. > > > > -----Original Message----- > From: Manikumar [mailto:manikumar.re...@gmail.com] > Sent: Wednesday, September 13, 2017 11:47 AM > To: users@kafka.apache.org > Subject: Re: Round Robin for several consumers in KAFKA > > what is the partition size? you need at least 2 partitions to distribute > across two consumers > > > On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) <lshr...@cisco.com> > wrote: > > > Hi, > > > > > > > > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , 1 > > producer and 2 consumers. > > > > I publish messages to the topic via the producer. > > > > Basically, I would likw that the messages will be published in a round > > robin algorithm, > > > > so for that purpose I defined the consumers with the same group.id and > > added config of partition.assignment.strategy to be > > org.apache.kafka.clients.consumer.RoundRobinAssignor, > > > > but actually only 1 consumer receive all the messages. > > > > *My Producer Code: * > > > > *public* *class* DiscoveryKafkaProducer{ > > > > Producer<String, String> producer; > > > > > > > > *public* DiscoveryKafkaProducer(Properties configs) { > > > > producer = *new* KafkaProducer<String, String>(configs); > > > > } > > > > *public* *void* send(String topic, List<String> records) { > > > > *for*(String record: records){ > > > > producer.send(*new* ProducerRecord<String, > > String>(topic, record)); > > > > } > > > > producer.flush(); > > > > } > > > > > > > > *public* *void* close() *throws* Exception { > > > > producer.close(); > > > > } > > > > } > > > > > > > > *My Consumer Code:* > > > > *public* *static* *void* main(String[] args) { > > > > String server = "lshraga-ubuntu-sp-nac:9092"; > > > > Properties consumerConfigs = *new* Properties(); > > > > consumerConfigs.put("bootstrap.servers", server); > > > > consumerConfigs.put("key.deserializer", > > "org.apache.kafka.common.serialization.StringDeserializer"); > > > > consumerConfigs.put("value.deserializer", > > "org.apache.kafka.common.serialization.StringDeserializer"); > > > > consumerConfigs.put("group.id", "discovery"); > > > > consumerConfigs.put("client.id", "discovery"); > > > > consumerConfigs.put("partition.assignment.strategy", > > "org.apache.kafka.clients.consumer.RoundRobinAssignor"); > > > > List<String> *list* = *new* ArrayList<String>(); > > > > DiscoveryKafkaConsumer consumer1 = *new* > > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName), > > consumerConfigs); > > > > > > > > *try* { > > > > *while* (*true*) { > > > > System.*out*.println("Start to consume"); > > > > consumer1.poll(1000L); > > > > } > > > > } *catch* (InterruptedException e) { > > > > // *TODO* Auto-generated catch block > > > > e.printStackTrace(); > > > > } > > > > > > > > *public* *class* DiscoveryKafkaConsumer { > > > > Consumer<String, String> consumer; > > > > Integer id; > > > > *public* DiscoveryKafkaConsumer(List<String> topics, Properties > > configs) { > > > > consumer = *new* KafkaConsumer<String, String>(configs); > > > > consumer.subscribe(topics); > > > > } > > > > > > > > *public* DiscoveryKafkaConsumer(*int* i, List<String> topics, > > Properties configs) { > > > > consumer = *new* KafkaConsumer<String, String>(configs); > > > > consumer.subscribe(topics); > > > > *this*.id = i; > > > > } > > > > > > > > *public* *void* poll(*long* timeout) *throws* InterruptedException > > { > > > > ConsumerRecords<String,String> records = > > consumer.poll(timeout); > > > > System.*out*.println("Hey!Consumer #" + id + "got records:" + > > records); > > > > Map<String, List<String>> results = *new* HashMap<String, > > List<String>>(); > > > > records.forEach((cr) -> { > > > > System.*out*.println("cr.topic()=" + cr.topic()); > > > > List<String> list = results.get(cr.topic()); > > > > *if*(list == *null*) { > > > > list = *new* ArrayList<>(); > > > > results.put(cr.topic(), list); > > > > } > > > > list.add(cr.value()); > > > > System.*out*.println("list=" + list); > > > > }); > > > > } > > > > *public* *void* close() *throws* Exception { > > > > consumer.close(); > > > > } > > > > *What I need to add/condig in order to consume the messages in a Round > > Robin ?* > > > > > > > > > > > > Thanks, > > > > > > > > > > > > [image: banner14] > > > > > > > > *Liel Shraga* > > > > ENGINEER.SOFTWARE ENGINEERING > > > > lshr...@cisco.com > > > > Tel: *+972 2 588 6394* > > > > *Cisco Systems, Inc.* > > > > 32 HaMelacha St., (HaSharon > > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail&so > > urce=g> > > Bldg) P.O.Box 8735, I.Z.Sapir > > SOUTH NETANYA > > 42504 > > Israel > > cisco.com > > > > > > > > [image: > > http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think > > before you print. > > > > This email may contain confidential and privileged material for the > > sole use of the intended recipient. Any review, use, distribution or > > disclosure by others is strictly prohibited. If you are not the > > intended recipient (or authorized to receive for the recipient), > > please contact the sender by reply email and delete all copies of this > message. > > > > Please click here > > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> > > for Company Registration Information. > > > > > > >