ruanliang-hualun commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r853824668


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +99,54 @@ public void teardown() {
         this.metrics.close();
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testDrainBatchesStarve() throws Exception {
+        // test case: node1(tp1,tp2) , node2(tp3,tp4)
+        // add tp-4
+        int partition4 = 3;
+        TopicPartition tp4 = new TopicPartition(topic, partition4);
+        PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, 
null, null);
+
+        long batchSize = value.length + 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+        RecordAccumulator accum = createTestRecordAccumulator( (int) 
batchSize, 1024, CompressionType.NONE, 10);
+        Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1,part2, part3,part4),
+                Collections.emptySet(), Collections.emptySet());
+
+        //  initial data
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain 2 record for tp1 , tp3
+        Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0);
+        assertEquals(2,batches1.size());
+        judgeValidTp(batches1,tp1,tp3);
+
+        // add record for tp1, tp3
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, 
maxBlockTimeMs, false, time.milliseconds());
+
+        // drain 2 record for tp2, tp4
+        Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new 
HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0);
+        judgeValidTp(batchss2,tp2,tp4);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, 
TopicPartition... tp) {
+        assertEquals(tp.length,batches1.entrySet().size());
+        List<ProducerBatch> list = new ArrayList();;
+        for (Map.Entry<Integer, List<ProducerBatch>> entry : 
batches1.entrySet()) {
+            List<ProducerBatch> batches = entry.getValue();
+            list.add(batches.get(0));

Review Comment:
   Thanks for your advice,  I benefited a lot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to