Does it mean that the number of brokers changes from time to time? On Mon, Sep 18, 2017 at 11:10 PM, Liel Shraga (lshraga) <lshr...@cisco.com> wrote:
> Hi, > > My docker compose file is : > > version: '2' > services: > zookeeper: > image: wurstmeister/zookeeper > ports: > - "2181:2181" > kafka: > image: wurstmeister/kafka > ports: > - "9092:9092" > environment: > KAFKA_ADVERTISED_HOST_NAME: lshraga-ubuntu-sp-nac > KAFKA_ADVERTISED_PORT: 9092 > KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 > KAFKA_NUM_PARTITIONS: 10 > volumes: > - /var/run/docker.sock:/var/run/docker.sock > > Basically, I increased the number of KAFKA_NUM_PARTITIONS to be 10, but I > need to do it dynamically, since I have several micro services which are > consumers and they grow and harvest in runtime dynanically. > Is there a way to change the number of pratiotns dynamically via java code? > > Thanks, > > > > > Liel Shraga > ENGINEER.SOFTWARE ENGINEERING > lshr...@cisco.com > Tel: +972 2 588 6394 > Cisco Systems, Inc. > 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir > SOUTH NETANYA > 42504 > Israel > cisco.com > > > Think before you print. > This email may contain confidential and privileged material for the sole > use of the intended recipient. Any review, use, distribution or disclosure > by others is strictly prohibited. If you are not the intended recipient (or > authorized to receive for the recipient), please contact the sender by > reply email and delete all copies of this message. > Please click here for Company Registration Information. > > > > -----Original Message----- > From: Amir Shahinpour [mailto:a...@holisticlabs.net] > Sent: Tuesday, September 19, 2017 9:07 AM > To: users@kafka.apache.org > Subject: Re: Round Robin for several consumers in KAFKA > > Can you provide your docker file, or compose file? > > On Wed, Sep 13, 2017 at 1:55 AM, Liel Shraga (lshraga) <lshr...@cisco.com> > wrote: > > > Hi, > > > > I didn’t define the partition size. How can I do it with kafka-clients > API? > > > > Thanks, > > > > > > > > > > > > Liel Shraga > > ENGINEER.SOFTWARE ENGINEERING > > lshr...@cisco.com > > Tel: +972 2 588 6394 > > Cisco Systems, Inc. > > 32 HaMelacha St., (HaSharon Bldg) P.O.Box 8735, I.Z.Sapir SOUTH > > NETANYA > > 42504 > > Israel > > cisco.com > > > > > > Think before you print. > > This email may contain confidential and privileged material for the > > sole use of the intended recipient. Any review, use, distribution or > > disclosure by others is strictly prohibited. If you are not the > > intended recipient (or authorized to receive for the recipient), > > please contact the sender by reply email and delete all copies of this > message. > > Please click here for Company Registration Information. > > > > > > > > -----Original Message----- > > From: Manikumar [mailto:manikumar.re...@gmail.com] > > Sent: Wednesday, September 13, 2017 11:47 AM > > To: users@kafka.apache.org > > Subject: Re: Round Robin for several consumers in KAFKA > > > > what is the partition size? you need at least 2 partitions to > > distribute across two consumers > > > > > > On Wed, Sep 13, 2017 at 1:24 PM, Liel Shraga (lshraga) > > <lshr...@cisco.com> > > wrote: > > > > > Hi, > > > > > > > > > > > > I have 5 separate docker images : 1 for kafka broker, 1 zookeeper , > > > 1 producer and 2 consumers. > > > > > > I publish messages to the topic via the producer. > > > > > > Basically, I would likw that the messages will be published in a > > > round robin algorithm, > > > > > > so for that purpose I defined the consumers with the same group.id > > > and added config of partition.assignment.strategy to be > > > org.apache.kafka.clients.consumer.RoundRobinAssignor, > > > > > > but actually only 1 consumer receive all the messages. > > > > > > *My Producer Code: * > > > > > > *public* *class* DiscoveryKafkaProducer{ > > > > > > Producer<String, String> producer; > > > > > > > > > > > > *public* DiscoveryKafkaProducer(Properties configs) { > > > > > > producer = *new* KafkaProducer<String, String>(configs); > > > > > > } > > > > > > *public* *void* send(String topic, List<String> records) { > > > > > > *for*(String record: records){ > > > > > > producer.send(*new* ProducerRecord<String, > > > String>(topic, record)); > > > > > > } > > > > > > producer.flush(); > > > > > > } > > > > > > > > > > > > *public* *void* close() *throws* Exception { > > > > > > producer.close(); > > > > > > } > > > > > > } > > > > > > > > > > > > *My Consumer Code:* > > > > > > *public* *static* *void* main(String[] args) { > > > > > > String server = "lshraga-ubuntu-sp-nac:9092"; > > > > > > Properties consumerConfigs = *new* Properties(); > > > > > > consumerConfigs.put("bootstrap.servers", server); > > > > > > consumerConfigs.put("key.deserializer", > > > "org.apache.kafka.common.serialization.StringDeserializer"); > > > > > > consumerConfigs.put("value.deserializer", > > > "org.apache.kafka.common.serialization.StringDeserializer"); > > > > > > consumerConfigs.put("group.id", "discovery"); > > > > > > consumerConfigs.put("client.id", "discovery"); > > > > > > consumerConfigs.put("partition.assignment.strategy", > > > "org.apache.kafka.clients.consumer.RoundRobinAssignor"); > > > > > > List<String> *list* = *new* ArrayList<String>(); > > > > > > DiscoveryKafkaConsumer consumer1 = *new* > > > DiscoveryKafkaConsumer(Collections.*singletonList*(topicName), > > > consumerConfigs); > > > > > > > > > > > > *try* { > > > > > > *while* (*true*) { > > > > > > System.*out*.println("Start to consume"); > > > > > > consumer1.poll(1000L); > > > > > > } > > > > > > } *catch* (InterruptedException e) { > > > > > > // *TODO* Auto-generated catch block > > > > > > e.printStackTrace(); > > > > > > } > > > > > > > > > > > > *public* *class* DiscoveryKafkaConsumer { > > > > > > Consumer<String, String> consumer; > > > > > > Integer id; > > > > > > *public* DiscoveryKafkaConsumer(List<String> topics, Properties > > > configs) { > > > > > > consumer = *new* KafkaConsumer<String, String>(configs); > > > > > > consumer.subscribe(topics); > > > > > > } > > > > > > > > > > > > *public* DiscoveryKafkaConsumer(*int* i, List<String> topics, > > > Properties configs) { > > > > > > consumer = *new* KafkaConsumer<String, String>(configs); > > > > > > consumer.subscribe(topics); > > > > > > *this*.id = i; > > > > > > } > > > > > > > > > > > > *public* *void* poll(*long* timeout) *throws* > > > InterruptedException { > > > > > > ConsumerRecords<String,String> records = > > > consumer.poll(timeout); > > > > > > System.*out*.println("Hey!Consumer #" + id + "got records:" + > > > records); > > > > > > Map<String, List<String>> results = *new* HashMap<String, > > > List<String>>(); > > > > > > records.forEach((cr) -> { > > > > > > System.*out*.println("cr.topic()=" + cr.topic()); > > > > > > List<String> list = results.get(cr.topic()); > > > > > > *if*(list == *null*) { > > > > > > list = *new* ArrayList<>(); > > > > > > results.put(cr.topic(), list); > > > > > > } > > > > > > list.add(cr.value()); > > > > > > System.*out*.println("list=" + list); > > > > > > }); > > > > > > } > > > > > > *public* *void* close() *throws* Exception { > > > > > > consumer.close(); > > > > > > } > > > > > > *What I need to add/condig in order to consume the messages in a > > > Round Robin ?* > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > [image: banner14] > > > > > > > > > > > > *Liel Shraga* > > > > > > ENGINEER.SOFTWARE ENGINEERING > > > > > > lshr...@cisco.com > > > > > > Tel: *+972 2 588 6394* > > > > > > *Cisco Systems, Inc.* > > > > > > 32 HaMelacha St., (HaSharon > > > <https://maps.google.com/?q=32+HaMelacha+St.,+(HaSharon&entry=gmail& > > > so > > > urce=g> > > > Bldg) P.O.Box 8735, I.Z.Sapir > > > SOUTH NETANYA > > > 42504 > > > Israel > > > cisco.com > > > > > > > > > > > > [image: > > > http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif]Think > > > before you print. > > > > > > This email may contain confidential and privileged material for the > > > sole use of the intended recipient. Any review, use, distribution or > > > disclosure by others is strictly prohibited. If you are not the > > > intended recipient (or authorized to receive for the recipient), > > > please contact the sender by reply email and delete all copies of > > > this > > message. > > > > > > Please click here > > > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> > > > for Company Registration Information. > > > > > > > > > > > >