hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r622470077
########## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ########## @@ -67,6 +74,30 @@ protected LeaderState( } this.grantingVoters.addAll(grantingVoters); this.log = logContext.logger(LeaderState.class); + this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); + } + + public BatchAccumulator<T> accumulator() { + return this.accumulator; + } + + private static List<Voter> convertToVoters(Set<Integer> voterIds) { + return voterIds.stream() + .map(follower -> new Voter().setVoterId(follower)) + .collect(Collectors.toList()); + } + + public void appendLeaderChangeMessage(long currentTimeMs) { + List<Voter> voters = convertToVoters(voterStates.keySet()); + List<Voter> grantingVoters = convertToVoters(this.grantingVoters()); + + LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage() + .setLeaderId(this.election().leaderId()) + .setVoters(voters) + .setGrantingVoters(grantingVoters); + + accumulator.appendLeaderChangeMessage(leaderChangeMessage, currentTimeMs); + accumulator.forceDrain(); Review comment: It was my suggestion in order to keep the contract of `BatchAccumulator` tight. We do "know" in this usage that the accumulator will be empty, but it costs us very little to protect the API without relying on external assumptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org