chia7712 commented on code in PR #18852: URL: https://github.com/apache/kafka/pull/18852#discussion_r1966907188
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -1802,10 +1804,31 @@ private boolean handleFetchResponse( } } - private void appendAsFollower( - Records records - ) { - LogAppendInfo info = log.appendAsFollower(records); + private static String convertToHexadecimal(Records records) { + ByteBuffer buffer = ((MemoryRecords) records).buffer(); + byte[] bytes = new byte[Math.min(buffer.remaining(), DefaultRecordBatch.RECORD_BATCH_OVERHEAD)]; + buffer.get(bytes); + + return HexFormat.of().formatHex(bytes); + } + + private void appendAsFollower(Records records) { + if (records.sizeInBytes() == 0) { + // Nothing to do if there are no bytes in the response + return; + } + + try { + var info = log.appendAsFollower(records, quorum.epoch()); + kafkaRaftMetrics.updateFetchedRecords(info.lastOffset - info.firstOffset + 1); + } catch (CorruptRecordException | InvalidRecordException e) { + logger.info( Review Comment: Pardon me, why not using `error` level? ########## clients/src/test/java/org/apache/kafka/common/record/InvalidMemoryRecordsProvider.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.errors.CorruptRecordException; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.stream.Stream; + +public final class InvalidMemoryRecordsProvider implements ArgumentsProvider { + // Use a baseOffset that's not zero so that it is less likely to match the LEO + private static final long BASE_OFFSET = 1234; + private static final int EPOCH = 4321; + + /** + * Returns a stream of arguments for invalid memory records and the expected exception. + * + * The first object in the {@code Arguments} is a {@code MemoryRecords}. + * + * The second object in the {@code Arguments} is an {@code Optional<Class<Exception>>} which is + * the expected exception from the log layer. + */ + @Override + public Stream<? extends Arguments> provideArguments(ExtensionContext context) { + return Stream.of( + Arguments.of(MemoryRecords.readableRecords(notEnoughtBytes()), Optional.empty()), + Arguments.of(MemoryRecords.readableRecords(recordsSizeTooSmall()), Optional.of(CorruptRecordException.class)), + Arguments.of(MemoryRecords.readableRecords(notEnoughBytesToMagic()), Optional.empty()), + Arguments.of(MemoryRecords.readableRecords(negativeMagic()), Optional.of(CorruptRecordException.class)), + Arguments.of(MemoryRecords.readableRecords(largeMagic()), Optional.of(CorruptRecordException.class)), + Arguments.of(MemoryRecords.readableRecords(lessBytesThanRecordSize()), Optional.empty()) + ); + } + + private static ByteBuffer notEnoughtBytes() { Review Comment: typo: `Enought` -> `Enough` ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1086,63 +1088,79 @@ class UnifiedLog(@volatile var logStartOffset: Long, var shallowOffsetOfMaxTimestamp = -1L var readFirstMessage = false var lastOffsetOfFirstBatch = -1L + var skipRemainingBatches = false records.batches.forEach { batch => if (origin == AppendOrigin.RAFT_LEADER && batch.partitionLeaderEpoch != leaderEpoch) { throw new InvalidRecordException("Append from Raft leader did not set the batch epoch correctly") } // we only validate V2 and higher to avoid potential compatibility issues with older clients - if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.CLIENT && batch.baseOffset != 0) + if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.CLIENT && batch.baseOffset != 0) { throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + s"be 0, but it is ${batch.baseOffset}") - - // update the first offset if on the first message. For magic versions older than 2, we use the last offset - // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). - // For magic version 2, we can get the first offset directly from the batch header. - // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower - // case, validation will be more lenient. - // Also indicate whether we have the accurate first offset or not - if (!readFirstMessage) { - if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) - firstOffset = batch.baseOffset - lastOffsetOfFirstBatch = batch.lastOffset - readFirstMessage = true } - // check that offsets are monotonically increasing - if (lastOffset >= batch.lastOffset) - monotonic = false - - // update the last offset seen - lastOffset = batch.lastOffset - lastLeaderEpoch = batch.partitionLeaderEpoch - - // Check if the message sizes are valid. - val batchSize = batch.sizeInBytes - if (!ignoreRecordSize && batchSize > config.maxMessageSize) { - brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) - brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) - throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + - s"which exceeds the maximum configured value of ${config.maxMessageSize}.") - } + /* During replication of uncommitted data it is possible for the remote replica to send record batches after it lost + * leadership. This can happend if sending FETCH responses is slowed because there is a race between sending the FETCH + * response and the replica truncating and appending to the log. The replicating replica resolves this issue by only + * persisting up to the partition leader epoch of the leader when the FETCH request was handled. See KAFKA-18723 for + * more details. + */ + skipRemainingBatches = skipRemainingBatches || hasInvalidPartitionLeaderEpoch(batch, origin, leaderEpoch); + if (skipRemainingBatches) { + info(s"Skipping batch $batch because origin is $origin and leader epoch is $leaderEpoch") Review Comment: maybe we can add "higher leader epoch" to the log message? otherwise, it seems the log message does not include the true reason. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1085,63 +1087,81 @@ class UnifiedLog(@volatile var logStartOffset: Long, var shallowOffsetOfMaxTimestamp = -1L var readFirstMessage = false var lastOffsetOfFirstBatch = -1L + var skipRemainingBatches = false records.batches.forEach { batch => if (origin == AppendOrigin.RAFT_LEADER && batch.partitionLeaderEpoch != leaderEpoch) { - throw new InvalidRecordException("Append from Raft leader did not set the batch epoch correctly") + throw new InvalidRecordException( + s"Append from Raft leader did not set the batch epoch correctly, expected $leaderEpoch " + + s"but the batch has ${batch.partitionLeaderEpoch}" + ) } // we only validate V2 and higher to avoid potential compatibility issues with older clients - if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.CLIENT && batch.baseOffset != 0) + if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.CLIENT && batch.baseOffset != 0) { throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + s"be 0, but it is ${batch.baseOffset}") - - // update the first offset if on the first message. For magic versions older than 2, we use the last offset - // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). - // For magic version 2, we can get the first offset directly from the batch header. - // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower - // case, validation will be more lenient. - // Also indicate whether we have the accurate first offset or not - if (!readFirstMessage) { - if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) - firstOffset = batch.baseOffset - lastOffsetOfFirstBatch = batch.lastOffset - readFirstMessage = true } - // check that offsets are monotonically increasing - if (lastOffset >= batch.lastOffset) - monotonic = false - - // update the last offset seen - lastOffset = batch.lastOffset - lastLeaderEpoch = batch.partitionLeaderEpoch - - // Check if the message sizes are valid. - val batchSize = batch.sizeInBytes - if (!ignoreRecordSize && batchSize > config.maxMessageSize) { - brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) - brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) - throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + - s"which exceeds the maximum configured value of ${config.maxMessageSize}.") - } + /* During replication of uncommitted data it is possible for the remote replica to send record batches after it lost + * leadership. This can happen if sending FETCH responses is slow. There is a race between sending the FETCH + * response and the replica truncating and appending to the log. The replicating replica resolves this issue by only + * persisting up to the current leader epoch used in the fetch request. See KAFKA-18723 for more details. + */ + skipRemainingBatches = skipRemainingBatches || hasHigherPartitionLeaderEpoch(batch, origin, leaderEpoch); + if (skipRemainingBatches) { Review Comment: Should we update the `invalidMessageCrcRecordsPerSec` as invalid batch? ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -1904,9 +1988,94 @@ class UnifiedLogTest { val log = createLog(logDir, LogTestUtils.createLogConfig(maxMessageBytes = second.sizeInBytes - 1)) - log.appendAsFollower(first) + log.appendAsFollower(first, Int.MaxValue) // the second record is larger then limit but appendAsFollower does not validate the size. - log.appendAsFollower(second) + log.appendAsFollower(second, Int.MaxValue) + } + + @ParameterizedTest + @ArgumentsSource(classOf[InvalidMemoryRecordsProvider]) + def testInvalidMemoryRecords(records: MemoryRecords, expectedException: Optional[Class[Exception]]): Unit = { + val logConfig = LogTestUtils.createLogConfig() + val log = createLog(logDir, logConfig) + val previousEndOffset = log.logEndOffsetMetadata.messageOffset + + if (expectedException.isPresent()) { + assertThrows( + expectedException.get(), + () => log.appendAsFollower(records, Int.MaxValue) + ); Review Comment: `;` is unnecessary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org