artemlivshits commented on code in PR #17977: URL: https://github.com/apache/kafka/pull/17977#discussion_r1906249224
########## clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java: ########## @@ -53,35 +53,40 @@ public void configure(Map<String, ?> configs) {} */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { - TopicPartition prevPartition = previousPartition.get(); - if (prevPartition != null) { - previousPartition.remove(); - if (topic.equals(prevPartition.topic())) { - return prevPartition.partition(); - } - } - - int nextValue = nextValue(topic); + Integer lastPartition = topicLastPartitionMap.get(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); - if (!availablePartitions.isEmpty()) { - int part = Utils.toPositive(nextValue) % availablePartitions.size(); - return availablePartitions.get(part).partition(); - } else { - // no partitions are available, give a non-available partition - int numPartitions = cluster.partitionsForTopic(topic).size(); - return Utils.toPositive(nextValue) % numPartitions; + + if (lastPartition != null && !availablePartitions.isEmpty()) { + return availablePartitions.get(lastPartition).partition(); Review Comment: Looks like we can calculate lastPartitions based on some previous value of availablePartitions, but now there could be less partitions available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org