showuon commented on code in PR #12066: URL: https://github.com/apache/kafka/pull/12066#discussion_r854844189
########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,66 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches2, tp2, tp4); + + // make sure in next run, the drain index will start from the beginning + Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches3, tp1, tp3); + + // test the contine case, mute the tp4 and drain batches from 2nodes: node1 => tp2, node2 => tp3 (because tp4 is muted) + // add record for tp2, tp3, tp4 mute the tp4 + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.mutePartition(tp4); + Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); Review Comment: add the above comment here, and change to `drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted)` ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,54 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + assertEquals(2, batches1.size()); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batchss2, tp2, tp4); + } + + private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> batches, TopicPartition... tp) { + assertEquals(tp.length,batches.size()); Review Comment: need space after comma (,) ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +98,66 @@ public void teardown() { this.metrics.close(); } + @Test + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches1, tp1, tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained + // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 + Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches2, tp2, tp4); + + // make sure in next run, the drain index will start from the beginning + Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0); + verifyTopicPartitionInBatches(batches3, tp1, tp3); + + // test the contine case, mute the tp4 and drain batches from 2nodes: node1 => tp2, node2 => tp3 (because tp4 is muted) + // add record for tp2, tp3, tp4 mute the tp4 Review Comment: remove the 1st comment: `// test the contine case,...` ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -559,13 +559,14 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i int size = 0; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); - /* to make starvation less likely this loop doesn't start at 0 */ + /* to make starvation less likely each node has it's own drainIndex */ + int drainIndex = getDrainIndex(node.idString()); int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); - this.drainIndex = (this.drainIndex + 1) % parts.size(); - + updateDrainIndex(node.idString(), drainIndex); Review Comment: Thanks for the explanation. Make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org