jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r611787145



##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -194,14 +196,45 @@ private void completeCurrentBatch() {
         MemoryRecords data = currentBatch.build();
         completed.add(new CompletedBatch<>(
             currentBatch.baseOffset(),
-            currentBatch.records(),
+            Optional.of(currentBatch.records()),
             data,
             memoryPool,
             currentBatch.initialBuffer()
         ));
         currentBatch = null;
     }
 
+    public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {
+        maybeCompleteDrain();
+        ByteBuffer buffer = memoryPool.tryAllocate(256);
+        if (buffer != null) {
+            MemoryRecords data = MemoryRecords.withLeaderChangeMessage(
+                    this.nextOffset, 
+                    currentTimeMs, 
+                    this.epoch, 
+                    buffer, 
+                    leaderChangeMessage);
+            completed.add(new CompletedBatch<>(
+                nextOffset,
+                Optional.empty(),
+                data,
+                memoryPool,
+                buffer
+            ));
+            nextOffset += 1;
+        }
+    }
+
+    public void flush() {

Review comment:
       I don't think we should expose this functionality. I think users of 
these type will always call `flush` after calling `appendLeaderChangeMessage`. 
If so why not do that implicitly in the method.

##########
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##########
@@ -19,10 +19,14 @@
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.mockito.Mockito;

Review comment:
       This order of import doesn't match any of the styles used in Kafka.

##########
File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
##########
@@ -36,30 +36,31 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LeaderStateTest {
+public class LeaderStateTest<T> {

Review comment:
       Why is the `T` leaked to the tests? Glancing at the code the type 
parameter `T` is never used. Did you try changing the signature to:
   
   ```
   private final BatchAccumulator<?> accumulator = 
Mockito.mock(BatchAccumulator.class);
   ```

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -194,14 +196,45 @@ private void completeCurrentBatch() {
         MemoryRecords data = currentBatch.build();
         completed.add(new CompletedBatch<>(
             currentBatch.baseOffset(),
-            currentBatch.records(),
+            Optional.of(currentBatch.records()),
             data,
             memoryPool,
             currentBatch.initialBuffer()
         ));
         currentBatch = null;
     }
 
+    public void appendLeaderChangeMessage(LeaderChangeMessage 
leaderChangeMessage, long currentTimeMs) {

Review comment:
       This method is not safe. I think you need to hold a lock before falling 
`maybeCompleteDrain` and updating `nextOffset`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to