Hi,

> Your first understanding is correct, provided each “consumer” means a
> “consumer thread”

all right, thanks

> IMO, Second understanding about message distribution is incorrect because
> there is something called as max poll records for each consumer. Its 500 by
> default. And the time between 2 polls is also very small in few milliseconds.
> Thats why this is happening.
> 
> You may need to try this on a big number of messages so that other
> partitions get assigned.

Ah ok, so you are saying the partition asignment depends on the load
of the topic ? This was new to me as I thought kafka distributes the
messages between the active consumers independent of the amount of
data. Is there documentation available that explains how this
correlates ?

If one poll() fetches them all it's clear to me that there is not
much left to distribute and if the subsequent poll() happens right
after the former one, I can imagine it stays at the same partition.
Thanks for pointing this out.

So I changed the code and used:

    max_poll_records=1

That actually means one poll() for each message. I added a
sleep time between the polls of 1sec and started two consumers(read.py)
with some delay such that there are a number of poll()'s from
different consumers with different timing.

From this situation I tested producing messages:

   20
   200
   2000
   200000

There was no case in which messages got distributed between the
two consumers. It was always one receiving them. If you close
one of the consumers the other continued receiving the messages.

I must admit I did not wait forever (1sec between polls is long :-))
but I also think it wouldn't have changed while processing.

> I tried my best to participate in discussion I am not expert though😊

Thanks much for looking into this. I still think something is
wrong in either my testing/code or on the cluster. Maybe some
details on the kafka setup helps:

This is the server setup:

    auto.create.topics.enable=false
    default.replication.factor=3
    min.insync.replicas=2
    num.io.threads=8
    num.network.threads=5
    num.partitions=20
    num.replica.fetchers=2
    replica.lag.time.max.ms=30000
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    socket.send.buffer.bytes=102400
    unclean.leader.election.enable=true
    zookeeper.session.timeout.ms=18000

This is the kafka version

"CurrentBrokerSoftwareInfo": {
    "KafkaVersion": "2.6.1"
}


Do you see any setting that impacts the dynamic partition assignment ?

I was reading about the api_version exchange and that it can
impact the availability of features:

    (0, 9) enables full group coordination features with automatic
           partition assignment and rebalancing,
    (0, 8, 2) enables kafka-storage offset commits with manual
           partition assignment only,
    (0, 8, 1) enables zookeeper-storage offset commits with manual
           partition assignment only,
    (0, 8, 0) enables basic functionality but requires manual
           partition assignment and offset management.


I was not able to find out which api_version the server as setup
by Amazon MSK is talking, though

Thanks

Regards,
Marcus
-- 
 Public Key available via: https://keybase.io/marcus_schaefer/key.asc
 keybase search marcus_schaefer
 -------------------------------------------------------
 Marcus Schäfer                 Am Unterösch 9
 Tel: +49 7562 905437           D-88316 Isny / Rohrdorf
 Germany
 -------------------------------------------------------

Attachment: signature.asc
Description: Digital signature

Reply via email to