showuon commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r854733717


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,54 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        assertEquals(2, batches1.size());
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batchss2, tp2, tp4);

Review Comment:
   I think we can add another line to make sure it'll pick tp1, tp3 in next 
run. That is:
   ```java
           // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because 
the max request size is full after the first batch drained
           // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
           Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, 
new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
           verifyTopicPartitionInBatches(batchss2, tp2, tp4);
   
           // make sure in next run, the drain index will start from the 
beginning
           Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, 
new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
           verifyTopicPartitionInBatches(batches3, tp1, tp3);
   ```



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,54 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        assertEquals(2, batches1.size());
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);

Review Comment:
   typo: `batchss2` -> `batches2`



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -29,6 +29,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+

Review Comment:
   Please remove this line. Thanks.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,54 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        assertEquals(2, batches1.size());
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batchss2, tp2, tp4);
+    }
+
+    private void verifyTopicPartitionInBatches(Map<Integer, 
List<ProducerBatch>> batches, TopicPartition... tp) {
+        assertEquals(tp.length,batches.size());
+        List<TopicPartition> topicPartitionsInBatch = new 
ArrayList<TopicPartition>();
+        for (Map.Entry<Integer, List<ProducerBatch>> entry : 
batches.entrySet()) {
+            List<ProducerBatch> batchList = entry.getValue();
+            assertEquals(batchList.size(), 1);

Review Comment:
   The method semantic is `assertEquals(expected,  actual)`
   So, we should do: `assertEquals(1, batchList.size());`, right?



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,54 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        assertEquals(2, batches1.size());
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batchss2, tp2, tp4);
+    }
+
+    private void verifyTopicPartitionInBatches(Map<Integer, 
List<ProducerBatch>> batches, TopicPartition... tp) {
+        assertEquals(tp.length,batches.size());
+        List<TopicPartition> topicPartitionsInBatch = new 
ArrayList<TopicPartition>();
+        for (Map.Entry<Integer, List<ProducerBatch>> entry : 
batches.entrySet()) {
+            List<ProducerBatch> batchList = entry.getValue();
+            assertEquals(batchList.size(), 1);
+            topicPartitionsInBatch.add(batchList.get(0).topicPartition);
+        }
+
+        for (int i = 0 ; i < tp.length ; i++){
+            assertEquals(topicPartitionsInBatch.get(i), tp[i]);

Review Comment:
   same here:
   `assertEquals(tp[i], topicPartitionsInBatch.get(i));`



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,54 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        assertEquals(2, batches1.size());
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batchss2, tp2, tp4);
+    }
+
+    private void verifyTopicPartitionInBatches(Map<Integer, 
List<ProducerBatch>> batches, TopicPartition... tp) {
+        assertEquals(tp.length,batches.size());

Review Comment:
   nit: add space after comma (,)



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -559,13 +560,12 @@ private List<ProducerBatch> 
drainBatchesForOneNode(Cluster cluster, Node node, i
         int size = 0;
         List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
         List<ProducerBatch> ready = new ArrayList<>();
-        /* to make starvation less likely this loop doesn't start at 0 */
+        /* to make starvation less likely each node has it's own drainIndex */
+        int drainIndex = getDrainIndex(node.idString());
         int start = drainIndex = drainIndex % parts.size();
         do {
             PartitionInfo part = parts.get(drainIndex);
             TopicPartition tp = new TopicPartition(part.topic(), 
part.partition());
-            this.drainIndex = (this.drainIndex + 1) % parts.size();
-
             // Only proceed if the partition has no in-flight batches.
             if (isMuted(tp))
                 continue;

Review Comment:
   Have a 2nd look, I found this change is not right. If the tp is muted or 
other reason, it `continued`, and we'll have a infinite loop. Please fix it and 
add a test for this case. Thanks.



##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,54 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        assertEquals(2, batches1.size());

Review Comment:
   We don't need to verify the size because we'll verify it in 
`verifyTopicPartitionInBatches`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to