showuon commented on code in PR #12066: URL: https://github.com/apache/kafka/pull/12066#discussion_r852661339
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -81,7 +82,7 @@ public class RecordAccumulator { private final IncompleteBatches incomplete; // The following variables are only accessed by the sender thread, so we don't need to protect them. private final Set<TopicPartition> muted; - private int drainIndex; + private Map<String,Integer> nodesDrainIndex; Review Comment: nit: need a space between `String,` and `Integer` `Map<String, Integer>` ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -560,12 +561,14 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ + int drainIndex = getDrainIndex(node.idString()); int start = drainIndex = drainIndex % parts.size(); + updateDrainIndex(node.idString(),drainIndex); Review Comment: I think we can update Drain Index at the end of `drainBatchesForOneNode`. We don't need to update it each time. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -560,12 +561,14 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ + int drainIndex = getDrainIndex(node.idString()); int start = drainIndex = drainIndex % parts.size(); + updateDrainIndex(node.idString(),drainIndex); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); - this.drainIndex = (this.drainIndex + 1) % parts.size(); - + drainIndex = (drainIndex + 1) % parts.size(); + updateDrainIndex(node.idString(),drainIndex); Review Comment: Same here, don't need to update it each time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org