Hi Marcus,
Your first understanding is correct, provided each “consumer” means a
“consumer thread”

IMO, Second understanding about message distribution is incorrect because
there is something called as max poll records for each consumer. Its 500 by
default.
And the time between 2 polls is also very small in few milliseconds.
Thats why this is happening.

You may need to try this on a big number of messages so that other
partitions get assigned.

I tried my best to participate in discussion I am not expert though😊




On Thu, 1 Jul 2021 at 2:53 AM, Marcus Schäfer <marcus.schae...@gmail.com>
wrote:

> Hi,
>
> I was reading through the docs and would like to make use
> of kafka's dynamic partition assignment feature in a way
> that partitions gets distributed evenly across all consumers.
> To my understanding the RoundRobinAssignor would be sufficient
> and I left the problem of rebalance if a consumer dies aside
> for the moment.
>
> If I understood it correctly consumers are associated to at
> least one partition. This means if you have e.g 10 consumers
> you need at least 10 partitions such that the dynamic partition
> assignment is possible. Is this correct ?
>
> Given my understanding is correct and there are 1000 messages
> written to topic "A" and 10 consumers using the RoundRobinAssignor
> I would expect 100 messages to be consumed by each consumer.
> Is this still correct ?
>
> Based on this I setup kafka in Amazon using Amazon MSK and
> went through the manual setup procedure such that there is a
> chance to influence the default settings. I setup the cluster
> with a max of 100partitions.
>
> Next I wrote python code using the kafka-python module (v2.0.2)
> to proof if I understood the concept before starting the
> real implementation.
>
> My producer does:
>
> write.py
> ----
> from kafka import KafkaProducer
>
> producer = KafkaProducer(
>     bootstrap_servers='...9092'
> )
> for count in range(1,100):
>     producer.send('cb-request', 'Message {0}'.format(count).encode())
>
> producer.flush()
> ----
>
>
> My consumer does:
>
> read.py
> ----
> from kafka import KafkaConsumer
> from kafka.coordinator.assignors.roundrobin import
> RoundRobinPartitionAssignor
>
> consumer = KafkaConsumer(
>     'cb-request',
>     partition_assignment_strategy=[RoundRobinPartitionAssignor],
>     auto_offset_reset='earliest',
>     enable_auto_commit=False,
>     bootstrap_servers='...9092',
>     group_id='cb-request-group'
> )
>
> try:
>     while(True):
>         raw_messages = consumer.poll(timeout_ms=10000)
>         for topic_partition, message_list in raw_messages.items():
>             print(topic_partition)
>             print(message_list)
>         consumer.commit()
> finally:
>     consumer.close()
> ----
>
>
> Now if I run this it works in the way that all produced messages
> are delivered to a consumer but it never scales. Meaning if I run
> multiple read.py's there is always only one that gets all the
> messages.
>
> The messages looks like this:
>
> ConsumerRecord(topic='cb-request', partition=0, offset=3964,
> timestamp=1625085873665, timestamp_type=0, key=None, value=b'Message 99',
> headers=[], checksum=None, serialized_key_size=-1,
> serialized_value_size=10, serialized_header_size=-1)
>
> and the partition assignment is always "partition=0"
>
> I think I'm doing it wrong or I misunderstood the concepts.
>
> Thus I'm writing here and kindly ask for help or to put me right
>
> Thanks much
>
> Regards,
> Marcus
> --
>  Public Key available via: https://keybase.io/marcus_schaefer/key.asc
>  keybase search marcus_schaefer
>  -------------------------------------------------------
>  Marcus Schäfer                 Am Unterösch 9
> <https://www.google.com/maps/search/Am+Unter%C3%B6sch+9?entry=gmail&source=g>
>  Tel: +49 7562 905437           D-88316 Isny / Rohrdorf
>  Germany
>  -------------------------------------------------------
>

Reply via email to