jsancio commented on a change in pull request #10480:
URL: https://github.com/apache/kafka/pull/10480#discussion_r622455380



##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -67,6 +74,30 @@ protected LeaderState(
         }
         this.grantingVoters.addAll(grantingVoters);
         this.log = logContext.logger(LeaderState.class);
+        this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+    }
+
+    public BatchAccumulator<T> accumulator() {
+        return this.accumulator;
+    }
+
+    private static List<Voter> convertToVoters(Set<Integer> voterIds) {
+        return voterIds.stream()
+            .map(follower -> new Voter().setVoterId(follower))
+            .collect(Collectors.toList());
+    }
+
+    public void appendLeaderChangeMessage(long currentTimeMs) {
+        List<Voter> voters = convertToVoters(voterStates.keySet());
+        List<Voter> grantingVoters = convertToVoters(this.grantingVoters());
+
+        LeaderChangeMessage leaderChangeMessage = new LeaderChangeMessage()
+            .setLeaderId(this.election().leaderId())
+            .setVoters(voters)
+            .setGrantingVoters(grantingVoters);
+        
+        accumulator.appendLeaderChangeMessage(leaderChangeMessage, 
currentTimeMs);
+        accumulator.forceDrain();

Review comment:
       We `forceDrain` inside `appendLeaderChangeMessage` before adding the 
leader change message to `completed`. Is this `forceDrain()` a no-op? Should 
the current batch be empty at this point? If we don't need to call this method, 
I say we remove it all together.

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
##########
@@ -65,6 +66,132 @@
         );
     }
 
+    @Test
+    public void testLeaderChangeMessageWritten() {
+        int leaderEpoch = 17;
+        long baseOffset = 0;
+        int lingerMs = 50;
+        int maxBatchSize = 512;
+
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        Mockito.when(memoryPool.tryAllocate(256))
+            .thenReturn(buffer);
+
+        BatchAccumulator<String> acc = buildAccumulator(
+            leaderEpoch,
+            baseOffset,
+            lingerMs,
+            maxBatchSize
+        );
+
+        acc.appendLeaderChangeMessage(new LeaderChangeMessage(), 
time.milliseconds());

Review comment:
       Let's check that `needsDrain` returns true after this point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to