Hi, > Your first understanding is correct, provided each “consumer” means a > “consumer thread”
all right, thanks > IMO, Second understanding about message distribution is incorrect because > there is something called as max poll records for each consumer. Its 500 by > default. And the time between 2 polls is also very small in few milliseconds. > Thats why this is happening. > > You may need to try this on a big number of messages so that other > partitions get assigned. Ah ok, so you are saying the partition asignment depends on the load of the topic ? This was new to me as I thought kafka distributes the messages between the active consumers independent of the amount of data. Is there documentation available that explains how this correlates ? If one poll() fetches them all it's clear to me that there is not much left to distribute and if the subsequent poll() happens right after the former one, I can imagine it stays at the same partition. Thanks for pointing this out. So I changed the code and used: max_poll_records=1 That actually means one poll() for each message. I added a sleep time between the polls of 1sec and started two consumers(read.py) with some delay such that there are a number of poll()'s from different consumers with different timing. From this situation I tested producing messages: 20 200 2000 200000 There was no case in which messages got distributed between the two consumers. It was always one receiving them. If you close one of the consumers the other continued receiving the messages. I must admit I did not wait forever (1sec between polls is long :-)) but I also think it wouldn't have changed while processing. > I tried my best to participate in discussion I am not expert though😊 Thanks much for looking into this. I still think something is wrong in either my testing/code or on the cluster. Maybe some details on the kafka setup helps: This is the server setup: auto.create.topics.enable=false default.replication.factor=3 min.insync.replicas=2 num.io.threads=8 num.network.threads=5 num.partitions=20 num.replica.fetchers=2 replica.lag.time.max.ms=30000 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 unclean.leader.election.enable=true zookeeper.session.timeout.ms=18000 This is the kafka version "CurrentBrokerSoftwareInfo": { "KafkaVersion": "2.6.1" } Do you see any setting that impacts the dynamic partition assignment ? I was reading about the api_version exchange and that it can impact the availability of features: (0, 9) enables full group coordination features with automatic partition assignment and rebalancing, (0, 8, 2) enables kafka-storage offset commits with manual partition assignment only, (0, 8, 1) enables zookeeper-storage offset commits with manual partition assignment only, (0, 8, 0) enables basic functionality but requires manual partition assignment and offset management. I was not able to find out which api_version the server as setup by Amazon MSK is talking, though Thanks Regards, Marcus -- Public Key available via: https://keybase.io/marcus_schaefer/key.asc keybase search marcus_schaefer ------------------------------------------------------- Marcus Schäfer Am Unterösch 9 Tel: +49 7562 905437 D-88316 Isny / Rohrdorf Germany -------------------------------------------------------
signature.asc
Description: Digital signature