jsancio commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r622455380
########## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ########## @@ -67,6 +74,30 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); + this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + } + + public BatchAccumulator<T> accumulator() { + return this.accumulator; + } + + private static List<Voter> convertToVoters(Set<Integer> voterIds) { + return voterIds.stream() + .map(follower -> new Voter().setVoterId(follower)) + .collect(Collectors.toList()); + } + + public void appendLeaderChangeMessage(long currentTimeMs) { + List<Voter> voters = convertToVoters(voterStates.keySet()); + List<Voter> grantingVoters = convertToVoters(this.grantingVoters()); + + LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() + .setLeaderId(this.election().leaderId()) + .setVoters(voters) + .setGrantingVoters(grantingVoters); + + accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs); + accumulator.forceDrain(); Review comment: We `forceDrain` inside `appendLeaderChangeMessage` before adding the leader change message to `completed`. Is this `forceDrain()` a no-op? Should the current batch be empty at this point? If we don't need to call this method, I say we remove it all together. ########## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ########## @@ -65,6 +66,132 @@ ); } + @Test + public void testLeaderChangeMessageWritten() { + int leaderEpoch = 17; + long baseOffset = 0; + int lingerMs = 50; + int maxBatchSize = 512; + + ByteBuffer buffer = ByteBuffer.allocate(256); + Mockito.when(memoryPool.tryAllocate(256)) + .thenReturn(buffer); + + BatchAccumulator<String> acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ); + + acc.appendLeaderChangeMessage(new LeaderChangeMessage(), time.milliseconds()); Review comment: Let's check that `needsDrain` returns true after this point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org