Hi

It seems there are several ways to get to the same number in Kafka API.
In Kafka Partitioner which is invoked by KafkaProducer we have this

public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
        List<PartitionInfo> partitions = 
cluster.partitionsForTopic(record.topic());
        int numPartitions = partitions.size();
. . .

And then we have KafkaProducer.partitionsFor(topicName);

It appears that the two may result in a different number causing failure during 
internal validation in Partitioner

// they have given us a partition, use it
            if (record.partition() < 0 || record.partition() >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given 
with record: " + record.partition()
                                                   + " is not in the range 
[0..."
                                                   + numPartitions
                                                   + "].”);


Basically we have RoundRobin partitioner that uses 
KafkaProducer.partitionsFor(topicName) to calculate the cycle, but getting the 
above error.
Could someone please explain the difference between the two methods to get 
partitions size?

Cheers
Oleg








Reply via email to