hachikuji commented on a change in pull request #10480: URL: https://github.com/apache/kafka/pull/10480#discussion_r621603737
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -2235,6 +2199,7 @@ private Long append(int epoch, List<T> records, boolean isAtomic) { } else { offset = accumulator.append(epoch, records); } + Review comment: nit: the extra line is still there? ########## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ########## @@ -354,6 +484,7 @@ public void testDrainDoesNotBlockWithConcurrentAppend() throws Exception { completedBatch.data.batches().forEach(recordBatch -> { assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); }); }); + acc.close(); Review comment: Out of curiosity, why do we do this here but not elsewhere in this class? ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -194,14 +196,52 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), - currentBatch.records(), + Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } + public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { + appendLock.lock(); + try { + forceDrain(); + ByteBuffer buffer = memoryPool.tryAllocate(256); + if (buffer != null) { + MemoryRecords data = MemoryRecords.withLeaderChangeMessage( + this.nextOffset, Review comment: nit: alignment looks a little off here. one extra indent? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1859,15 +1819,17 @@ private void appendBatch( offsetAndEpoch.offset + 1, Integer.MAX_VALUE); future.whenComplete((commitTimeMs, exception) -> { - int numRecords = batch.records.size(); - if (exception != null) { - logger.debug("Failed to commit {} records at {}", numRecords, offsetAndEpoch, exception); - } else { - long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs); - double elapsedTimePerRecord = (double) elapsedTime / numRecords; - kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs); - logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch); - maybeFireHandleCommit(batch.baseOffset, epoch, batch.records); + if (batch.records.isPresent()) { Review comment: It would be nice if we didn't lose the exception for the leader change message. Perhaps one way we can do this is to add `numRecords` as a separate field in `CompletedBatch` so that we always have it available. Then we could change this to something like this: ```java future.whenComplete((commitTimeMs, exception) -> { if (exception != null) { logger.debug("Failed to commit {} records at {}", batch.numRecords, offsetAndEpoch, exception); } else { long elapsedTime = Math.max(0, commitTimeMs - appendTimeMs); double elapsedTimePerRecord = (double) elapsedTime / batch.numRecords; kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, appendTimeMs); logger.debug("Completed commit of {} records at {}", batch.numRecords, offsetAndEpoch); batch.records.ifPresent(records -> maybeFireHandleCommit(batch.baseOffset, epoch, records)); } } ``` ########## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ########## @@ -65,6 +66,135 @@ ); } + @Test + public void testLeaderChangeMessageWritten() { + int leaderEpoch = 17; + long baseOffset = 0; + int lingerMs = 50; + int maxBatchSize = 512; + + ByteBuffer buffer = ByteBuffer.allocate(256); + Mockito.when(memoryPool.tryAllocate(256)) + .thenReturn(buffer); + + BatchAccumulator<String> acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ); + + acc.appendLeaderChangeMessage(new LeaderChangeMessage(), time.milliseconds()); + + List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain(); + assertEquals(1, batches.size()); + + BatchAccumulator.CompletedBatch<String> batch = batches.get(0); + batch.release(); + Mockito.verify(memoryPool).release(buffer); + } + + @Test + public void testForceDrain() { + asList(APPEND, APPEND_ATOMIC).forEach(appender -> { + int leaderEpoch = 17; + long baseOffset = 157; + int lingerMs = 50; + int maxBatchSize = 512; + + Mockito.when(memoryPool.tryAllocate(maxBatchSize)) + .thenReturn(ByteBuffer.allocate(maxBatchSize)); + + BatchAccumulator<String> acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ); + + List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i"); + + // Append records + assertEquals(baseOffset, appender.call(acc, leaderEpoch, records.subList(0, 1))); + assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, records.subList(1, 3))); + assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, records.subList(3, 6))); + assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, records.subList(6, 8))); + assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, records.subList(8, 9))); + + // Check that a drain is needed + time.sleep(lingerMs); Review comment: Hmm I think a better test would be to skip the linger wait time. This would show that calling `forceDrain` causes us to forego the linger: ```java assertFalse(acc.needsDrain(time.milliseconds())); acc.forceDrain(); assertTrue(acc.needsDrain(time.milliseconds())); assertEquals(0, acc.timeUntilDrain(time.milliseconds())); ``` ########## File path: raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java ########## @@ -208,11 +227,11 @@ public void testGetVoterStates() { ), state.getVoterEndOffsets()); } - private LeaderState setUpLeaderAndFollowers(int follower1, + private LeaderState<?> setUpLeaderAndFollowers(int follower1, int follower2, Review comment: nit: misaligned ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -194,14 +196,52 @@ private void completeCurrentBatch() { MemoryRecords data = currentBatch.build(); completed.add(new CompletedBatch<>( currentBatch.baseOffset(), - currentBatch.records(), + Optional.of(currentBatch.records()), data, memoryPool, currentBatch.initialBuffer() )); currentBatch = null; } + public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) { + appendLock.lock(); + try { + forceDrain(); + ByteBuffer buffer = memoryPool.tryAllocate(256); + if (buffer != null) { + MemoryRecords data = MemoryRecords.withLeaderChangeMessage( + this.nextOffset, + currentTimeMs, + this.epoch, + buffer, + leaderChangeMessage); + completed.add(new CompletedBatch<>( + nextOffset, + Optional.empty(), + data, + memoryPool, + buffer + )); + nextOffset += 1; + } else { + throw new IllegalStateException("Could not allocate buffer for the control batch."); Review comment: nit: perhaps we can be more explicit and say "leader change record" instead of "control batch" ########## File path: raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java ########## @@ -65,6 +66,135 @@ ); } + @Test + public void testLeaderChangeMessageWritten() { + int leaderEpoch = 17; + long baseOffset = 0; + int lingerMs = 50; + int maxBatchSize = 512; + + ByteBuffer buffer = ByteBuffer.allocate(256); + Mockito.when(memoryPool.tryAllocate(256)) + .thenReturn(buffer); + + BatchAccumulator<String> acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ); + + acc.appendLeaderChangeMessage(new LeaderChangeMessage(), time.milliseconds()); + + List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain(); + assertEquals(1, batches.size()); + + BatchAccumulator.CompletedBatch<String> batch = batches.get(0); + batch.release(); + Mockito.verify(memoryPool).release(buffer); + } + + @Test + public void testForceDrain() { + asList(APPEND, APPEND_ATOMIC).forEach(appender -> { + int leaderEpoch = 17; + long baseOffset = 157; + int lingerMs = 50; + int maxBatchSize = 512; + + Mockito.when(memoryPool.tryAllocate(maxBatchSize)) + .thenReturn(ByteBuffer.allocate(maxBatchSize)); + + BatchAccumulator<String> acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ); + + List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i"); + + // Append records + assertEquals(baseOffset, appender.call(acc, leaderEpoch, records.subList(0, 1))); + assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, records.subList(1, 3))); + assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, records.subList(3, 6))); + assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, records.subList(6, 8))); + assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, records.subList(8, 9))); + + // Check that a drain is needed + time.sleep(lingerMs); + assertTrue(acc.needsDrain(time.milliseconds())); + + // Force a drain and check that the drain status is `FINISHED` + acc.forceDrain(); + assertEquals(0, acc.timeUntilDrain(time.milliseconds())); + + // Drain completed batches + List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain(); + + assertEquals(1, batches.size()); + assertFalse(acc.needsDrain(time.milliseconds())); + assertEquals(Long.MAX_VALUE - time.milliseconds(), acc.timeUntilDrain(time.milliseconds())); + + BatchAccumulator.CompletedBatch<String> batch = batches.get(0); + assertEquals(records, batch.records.get()); + assertEquals(baseOffset, batch.baseOffset); + }); + } + + @Test + public void testForceDrainBeforeAppendLeaderChangeMessage() { + asList(APPEND, APPEND_ATOMIC).forEach(appender -> { + int leaderEpoch = 17; + long baseOffset = 157; + int lingerMs = 50; + int maxBatchSize = 512; + + Mockito.when(memoryPool.tryAllocate(maxBatchSize)) + .thenReturn(ByteBuffer.allocate(maxBatchSize)); + Mockito.when(memoryPool.tryAllocate(256)) + .thenReturn(ByteBuffer.allocate(256)); + + BatchAccumulator<String> acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ); + + List<String> records = asList("a", "b", "c", "d", "e", "f", "g", "h", "i"); + + // Append records + assertEquals(baseOffset, appender.call(acc, leaderEpoch, records.subList(0, 1))); + assertEquals(baseOffset + 2, appender.call(acc, leaderEpoch, records.subList(1, 3))); + assertEquals(baseOffset + 5, appender.call(acc, leaderEpoch, records.subList(3, 6))); + assertEquals(baseOffset + 7, appender.call(acc, leaderEpoch, records.subList(6, 8))); + assertEquals(baseOffset + 8, appender.call(acc, leaderEpoch, records.subList(8, 9))); + + // Check that a drain is needed + time.sleep(lingerMs); Review comment: Similarly, can we skip the linger here? We want to ensure that the current batch that is being built will be immediately finished when we call `appendLeaderChangeMessage`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org