hachikuji commented on a change in pull request #10063:
URL: https://github.com/apache/kafka/pull/10063#discussion_r578870759



##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -298,28 +342,30 @@ public void close() {
         public final List<T> records;
         public final MemoryRecords data;
         private final MemoryPool pool;
-        private final ByteBuffer buffer;
+        // Buffer that was allocated by the MemoryPool (pool). This may not be 
the buffer used in
+        // the MemoryRecords (data) object.
+        private final ByteBuffer pooledBuffer;
 
         private CompletedBatch(
             long baseOffset,
             List<T> records,
             MemoryRecords data,
             MemoryPool pool,
-            ByteBuffer buffer
+            ByteBuffer pooledBuffer

Review comment:
       nit: might be worth using the same name in `BatchBuilder`. Currently we 
use `initialBuffer`

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##########
@@ -164,47 +170,84 @@ public void testUnflushedBuffersReleasedByClose() {
 
     @Test
     public void testSingleBatchAccumulation() {
-        int leaderEpoch = 17;
-        long baseOffset = 157;
-        int lingerMs = 50;
-        int maxBatchSize = 512;
-
-        Mockito.when(memoryPool.tryAllocate(maxBatchSize))
-            .thenReturn(ByteBuffer.allocate(maxBatchSize));
-
-        BatchAccumulator<String> acc = buildAccumulator(
-            leaderEpoch,
-            baseOffset,
-            lingerMs,
-            maxBatchSize
-        );
-
-        List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", 
"i");
-        assertEquals(baseOffset, acc.append(leaderEpoch, records.subList(0, 
1)));
-        assertEquals(baseOffset + 2, acc.append(leaderEpoch, 
records.subList(1, 3)));
-        assertEquals(baseOffset + 5, acc.append(leaderEpoch, 
records.subList(3, 6)));
-        assertEquals(baseOffset + 7, acc.append(leaderEpoch, 
records.subList(6, 8)));
-        assertEquals(baseOffset + 8, acc.append(leaderEpoch, 
records.subList(8, 9)));
-
-        time.sleep(lingerMs);
-        assertTrue(acc.needsDrain(time.milliseconds()));
-
-        List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
-        assertEquals(1, batches.size());
-        assertFalse(acc.needsDrain(time.milliseconds()));
-        assertEquals(Long.MAX_VALUE - time.milliseconds(), 
acc.timeUntilDrain(time.milliseconds()));
-
-        BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
-        assertEquals(records, batch.records);
-        assertEquals(baseOffset, batch.baseOffset);
+        asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
+            int leaderEpoch = 17;
+            long baseOffset = 157;
+            int lingerMs = 50;
+            int maxBatchSize = 512;
+
+            Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+                .thenReturn(ByteBuffer.allocate(maxBatchSize));
+
+            BatchAccumulator<String> acc = buildAccumulator(
+                leaderEpoch,
+                baseOffset,
+                lingerMs,
+                maxBatchSize
+            );
+
+            List<String> records = asList("a", "b", "c", "d", "e", "f", "g", 
"h", "i");
+            assertEquals(baseOffset, appender.call(acc, leaderEpoch, 
records.subList(0, 1)));
+            assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, 
records.subList(1, 3)));
+            assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, 
records.subList(3, 6)));
+            assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, 
records.subList(6, 8)));
+            assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, 
records.subList(8, 9)));
+
+            time.sleep(lingerMs);
+            assertTrue(acc.needsDrain(time.milliseconds()));
+
+            List<BatchAccumulator.CompletedBatch<String>> batches = 
acc.drain();
+            assertEquals(1, batches.size());
+            assertFalse(acc.needsDrain(time.milliseconds()));
+            assertEquals(Long.MAX_VALUE - time.milliseconds(), 
acc.timeUntilDrain(time.milliseconds()));
+
+            BatchAccumulator.CompletedBatch<String> batch = batches.get(0);
+            assertEquals(records, batch.records);
+            assertEquals(baseOffset, batch.baseOffset);
+        });
     }
 
     @Test
     public void testMultipleBatchAccumulation() {
+        asList(APPEND, APPEND_ATOMIC).forEach(appender -> {
+            int leaderEpoch = 17;
+            long baseOffset = 157;
+            int lingerMs = 50;
+            int maxBatchSize = 256;
+
+            Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+                .thenReturn(ByteBuffer.allocate(maxBatchSize));
+
+            BatchAccumulator<String> acc = buildAccumulator(
+                leaderEpoch,
+                baseOffset,
+                lingerMs,
+                maxBatchSize
+            );
+
+            // Append entries until we have 4 batches to drain (3 completed, 1 
building)
+            while (acc.numCompletedBatches() < 3) {
+                appender.call(acc, leaderEpoch, singletonList("foo"));
+            }
+
+            List<BatchAccumulator.CompletedBatch<String>> batches = 
acc.drain();
+            assertEquals(4, batches.size());
+            assertTrue(batches.stream().allMatch(batch -> 
batch.data.sizeInBytes() <= maxBatchSize));
+        });
+    }
+
+    @Test void testRecordsAreSplit() {

Review comment:
       nit: it's a little weird for this test alone to have the annotation on 
the same line as the method definition




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to