artemlivshits commented on code in PR #17977:
URL: https://github.com/apache/kafka/pull/17977#discussion_r1906249224


##########
clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java:
##########
@@ -53,35 +53,40 @@ public void configure(Map<String, ?> configs) {}
      */
     @Override
     public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
-        TopicPartition prevPartition = previousPartition.get();
-        if (prevPartition != null) {
-            previousPartition.remove();
-            if (topic.equals(prevPartition.topic())) {
-                return prevPartition.partition();
-            }
-        }
-
-        int nextValue = nextValue(topic);
+        Integer lastPartition = topicLastPartitionMap.get(topic);
         List<PartitionInfo> availablePartitions = 
cluster.availablePartitionsForTopic(topic);
-        if (!availablePartitions.isEmpty()) {
-            int part = Utils.toPositive(nextValue) % 
availablePartitions.size();
-            return availablePartitions.get(part).partition();
-        } else {
-            // no partitions are available, give a non-available partition
-            int numPartitions = cluster.partitionsForTopic(topic).size();
-            return Utils.toPositive(nextValue) % numPartitions;
+
+        if (lastPartition != null && !availablePartitions.isEmpty()) {
+            return availablePartitions.get(lastPartition).partition();

Review Comment:
   Looks like we can calculate lastPartitions based on some previous value of 
availablePartitions, but now there could be less partitions available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to