showuon commented on code in PR #12066: URL: https://github.com/apache/kafka/pull/12066#discussion_r854850350
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,66 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches2, tp2, tp4); + + // make sure in next run, the drain index will start from the beginning + Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches3, tp1, tp3); + + // test the contine case, mute the tp4 and drain batches from 2nodes: node1 => tp2, node2 => tp3 (because tp4 is muted) + // add record for tp2, tp3, tp4 mute the tp4 Review Comment: nit: `// add record for tp2, tp3, tp4, [and] mute the tp4` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org