Hi Marcus, Your first understanding is correct, provided each “consumer” means a “consumer thread”
IMO, Second understanding about message distribution is incorrect because there is something called as max poll records for each consumer. Its 500 by default. And the time between 2 polls is also very small in few milliseconds. Thats why this is happening. You may need to try this on a big number of messages so that other partitions get assigned. I tried my best to participate in discussion I am not expert though😊 On Thu, 1 Jul 2021 at 2:53 AM, Marcus Schäfer <marcus.schae...@gmail.com> wrote: > Hi, > > I was reading through the docs and would like to make use > of kafka's dynamic partition assignment feature in a way > that partitions gets distributed evenly across all consumers. > To my understanding the RoundRobinAssignor would be sufficient > and I left the problem of rebalance if a consumer dies aside > for the moment. > > If I understood it correctly consumers are associated to at > least one partition. This means if you have e.g 10 consumers > you need at least 10 partitions such that the dynamic partition > assignment is possible. Is this correct ? > > Given my understanding is correct and there are 1000 messages > written to topic "A" and 10 consumers using the RoundRobinAssignor > I would expect 100 messages to be consumed by each consumer. > Is this still correct ? > > Based on this I setup kafka in Amazon using Amazon MSK and > went through the manual setup procedure such that there is a > chance to influence the default settings. I setup the cluster > with a max of 100partitions. > > Next I wrote python code using the kafka-python module (v2.0.2) > to proof if I understood the concept before starting the > real implementation. > > My producer does: > > write.py > ---- > from kafka import KafkaProducer > > producer = KafkaProducer( > bootstrap_servers='...9092' > ) > for count in range(1,100): > producer.send('cb-request', 'Message {0}'.format(count).encode()) > > producer.flush() > ---- > > > My consumer does: > > read.py > ---- > from kafka import KafkaConsumer > from kafka.coordinator.assignors.roundrobin import > RoundRobinPartitionAssignor > > consumer = KafkaConsumer( > 'cb-request', > partition_assignment_strategy=[RoundRobinPartitionAssignor], > auto_offset_reset='earliest', > enable_auto_commit=False, > bootstrap_servers='...9092', > group_id='cb-request-group' > ) > > try: > while(True): > raw_messages = consumer.poll(timeout_ms=10000) > for topic_partition, message_list in raw_messages.items(): > print(topic_partition) > print(message_list) > consumer.commit() > finally: > consumer.close() > ---- > > > Now if I run this it works in the way that all produced messages > are delivered to a consumer but it never scales. Meaning if I run > multiple read.py's there is always only one that gets all the > messages. > > The messages looks like this: > > ConsumerRecord(topic='cb-request', partition=0, offset=3964, > timestamp=1625085873665, timestamp_type=0, key=None, value=b'Message 99', > headers=[], checksum=None, serialized_key_size=-1, > serialized_value_size=10, serialized_header_size=-1) > > and the partition assignment is always "partition=0" > > I think I'm doing it wrong or I misunderstood the concepts. > > Thus I'm writing here and kindly ask for help or to put me right > > Thanks much > > Regards, > Marcus > -- > Public Key available via: https://keybase.io/marcus_schaefer/key.asc > keybase search marcus_schaefer > ------------------------------------------------------- > Marcus Schäfer Am Unterösch 9 > <https://www.google.com/maps/search/Am+Unter%C3%B6sch+9?entry=gmail&source=g> > Tel: +49 7562 905437 D-88316 Isny / Rohrdorf > Germany > ------------------------------------------------------- >