ruanliang-hualun commented on code in PR #12066:
URL: https://github.com/apache/kafka/pull/12066#discussion_r854849210
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -98,6 +98,66 @@ public void teardown() {
this.metrics.close();
}
+ @Test
+ public void testDrainBatchesStarve() throws Exception {
+ // test case: node1(tp1,tp2) , node2(tp3,tp4)
+ // add tp-4
+ int partition4 = 3;
+ TopicPartition tp4 = new TopicPartition(topic, partition4);
+ PartitionInfo part4 = new PartitionInfo(topic, partition4, node2,
null, null);
+
+ long batchSize = value.length +
DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
+ RecordAccumulator accum = createTestRecordAccumulator( (int)
batchSize, 1024, CompressionType.NONE, 10);
+ Cluster cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2, part3, part4),
+ Collections.emptySet(), Collections.emptySet());
+
+ // initial data
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+ accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+ accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+ accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+
+ // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the
max request size is full after the first batch drained
+ Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ verifyTopicPartitionInBatches(batches1, tp1, tp3);
+
+ // add record for tp1, tp3
+ accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+ accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+
+ // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the
max request size is full after the first batch drained
+ // The drain index should start from next topic partition, that is,
node1 => tp2, node2 => tp4
+ Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ verifyTopicPartitionInBatches(batches2, tp2, tp4);
+
+ // make sure in next run, the drain index will start from the beginning
+ Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ verifyTopicPartitionInBatches(batches3, tp1, tp3);
+
+ // test the contine case, mute the tp4 and drain batches from 2nodes:
node1 => tp2, node2 => tp3 (because tp4 is muted)
+ // add record for tp2, tp3, tp4 mute the tp4
+ accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+ accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+ accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null,
maxBlockTimeMs, false, time.milliseconds());
+ accum.mutePartition(tp4);
+ Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new
HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
Review Comment:
thanks for the detailed reviews, I learned a lot
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]