jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r611787145
########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -194,14 +196,45 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), - currentBatch.records(), + Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } + public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { + maybeCompleteDrain(); + ByteBuffer buffer = memoryPool.tryAllocate(256); + if (buffer != null) { + MemoryRecords data = MemoryRecords.withLeaderChangeMessage( + this.nextOffset, + currentTimeMs, + this.epoch, + buffer, + leaderChangeMessage); + completed.add(new CompletedBatch<>( + nextOffset, + Optional.empty(), + data, + memoryPool, + buffer + )); + nextOffset += 1; + } + } + + public void flush() { Review comment: I don't think we should expose this functionality. I think users of these type will always call `flush` after calling `appendLeaderChangeMessage`. If so why not do that implicitly in the method. ########## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ########## @@ -19,10 +19,14 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.mockito.Mockito; Review comment: This order of import doesn't match any of the styles used in Kafka. ########## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ########## @@ -36,30 +36,31 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class LeaderStateTest { +public class LeaderStateTest<T> { Review comment: Why is the `T` leaked to the tests? Glancing at the code the type parameter `T` is never used. Did you try changing the signature to: ``` private final BatchAccumulator<?> accumulator = Mockito.mock(BatchAccumulator.class); ``` ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -194,14 +196,45 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), - currentBatch.records(), + Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } + public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { Review comment: This method is not safe. I think you need to hold a lock before falling `maybeCompleteDrain` and updating `nextOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org