showuon commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r854850350


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,66 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3, part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the 
max request size is full after the first batch drained
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the 
max request size is full after the first batch drained
+        // The drain index should start from next topic partition, that is, 
node1 => tp2, node2 => tp4
+        Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches2, tp2, tp4);
+
+        // make sure in next run, the drain index will start from the beginning
+        Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+        verifyTopicPartitionInBatches(batches3, tp1, tp3);
+
+        // test the contine case, mute the tp4 and drain batches from 2nodes: 
node1 => tp2, node2 => tp3 (because tp4 is muted)
+        // add record for tp2, tp3, tp4  mute the tp4

Review Comment:
   nit: `// add record for tp2, tp3, tp4, [and] mute the tp4`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to