showuon commented on code in PR #12066: URL: https://github.com/apache/kafka/pull/12066#discussion_r853683790
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -29,6 +29,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; + Review Comment: nit: remove this line ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -81,7 +82,7 @@ public class RecordAccumulator { private final IncompleteBatches incomplete; // The following variables are only accessed by the sender thread, so we don't need to protect them. private final Set<TopicPartition> muted; - private int drainIndex; + private Map<String, Integer> nodesDrainIndex; Review Comment: private `final` Map<String, Integer> nodesDrainIndex; ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -634,10 +634,20 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i ready.add(batch); batch.drained(now); + drainIndex = (drainIndex + 1) % parts.size(); } while (start != drainIndex); + updateDrainIndex(node.idString(), drainIndex); return ready; } + int getDrainIndex(String idString) { + return nodesDrainIndex.computeIfAbsent(idString, s -> 0); + } + + void updateDrainIndex(String idString, int drainIndex) { Review Comment: private method, please ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") Review Comment: why unchecked? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + judgeValidTp(batchss2,tp2,tp4); Review Comment: We can continue the test to make sure next run, the index will start from the beginning (i.e. drainIndex % part.size), that is: node1 => tp1, node2 => tp3 ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + judgeValidTp(batchss2,tp2,tp4); + } + + @SuppressWarnings("unchecked") + private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) { + assertEquals(tp.length,batches1.entrySet().size()); Review Comment: What's the difference between `batches1.entrySet().size()` and `batches1.size()`? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); Review Comment: And same comment to below lines. ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + judgeValidTp(batchss2,tp2,tp4); + } + + @SuppressWarnings("unchecked") + private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) { Review Comment: method name is not clear. How about `verifyTopicPartitionInBatches`? And the argument name, `batches1` -> `batches` ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); Review Comment: add space between [,] and next argument. `Arrays.asList(part1, part2, part3, part4)` ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 Review Comment: nit: we can add more context here, ex: `drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained` ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + judgeValidTp(batchss2,tp2,tp4); + } + + @SuppressWarnings("unchecked") Review Comment: Why unchecked? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -634,10 +634,20 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i ready.add(batch); batch.drained(now); + drainIndex = (drainIndex + 1) % parts.size(); } while (start != drainIndex); + updateDrainIndex(node.idString(), drainIndex); return ready; } + int getDrainIndex(String idString) { Review Comment: private method, please ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + judgeValidTp(batchss2,tp2,tp4); + } + + @SuppressWarnings("unchecked") + private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) { + assertEquals(tp.length,batches1.entrySet().size()); + List<ProducerBatch> list = new ArrayList();; + for (Map.Entry<Integer, List<ProducerBatch>> entry : batches1.entrySet()) { + List<ProducerBatch> batches = entry.getValue(); + list.add(batches.get(0)); Review Comment: We should also verify the batches size is 1 here, before adding into list ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { Review Comment: nice test ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -560,12 +561,11 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); /* to make starvation less likely this loop doesn't start at 0 */ Review Comment: Does this comment still apply now? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 Review Comment: add more context here, ex: `The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4` ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); Review Comment: add space between [,] and next argument. node1, node2... ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + judgeValidTp(batchss2,tp2,tp4); + } + + @SuppressWarnings("unchecked") + private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) { + assertEquals(tp.length,batches1.entrySet().size()); + List<ProducerBatch> list = new ArrayList();; + for (Map.Entry<Integer, List<ProducerBatch>> entry : batches1.entrySet()) { + List<ProducerBatch> batches = entry.getValue(); + list.add(batches.get(0)); Review Comment: Could we add the topoicPartition only here? ex: `list.add(batches.get(0).topicPartition);` So in the follow assertion, we can just do: `assertArrayEquals(list, tp)` WDYT? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + judgeValidTp(batchss2,tp2,tp4); + } + + @SuppressWarnings("unchecked") + private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) { + assertEquals(tp.length,batches1.entrySet().size()); + List<ProducerBatch> list = new ArrayList();; Review Comment: And the variable name, maybe `topicPartitionsInBatch` is much clear? ########## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ########## @@ -98,6 +99,54 @@ public void teardown() { this.metrics.close(); } + @Test + @SuppressWarnings("unchecked") + public void testDrainBatchesStarve() throws Exception { + // test case: node1(tp1,tp2) , node2(tp3,tp4) + // add tp-4 + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + + long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; + RecordAccumulator accum = createTestRecordAccumulator( (int) batchSize, 1024, CompressionType.NONE, 10); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,part2, part3,part4), + Collections.emptySet(), Collections.emptySet()); + + // initial data + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp1 , tp3 + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + assertEquals(2,batches1.size()); + judgeValidTp(batches1,tp1,tp3); + + // add record for tp1, tp3 + accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds()); + + // drain 2 record for tp2, tp4 + Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1,node2)), (int) batchSize, 0); + judgeValidTp(batchss2,tp2,tp4); + } + + @SuppressWarnings("unchecked") + private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) { + assertEquals(tp.length,batches1.entrySet().size()); + List<ProducerBatch> list = new ArrayList();; Review Comment: additional `;` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org