junrao commented on code in PR #18012: URL: https://github.com/apache/kafka/pull/18012#discussion_r1906137724
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ########## @@ -257,13 +257,21 @@ public void append(long largestOffset, if (largestTimestampMs > maxTimestampSoFar()) { maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); } - // append an entry to the index (if needed) + // append an entry to the timestamp index at MemoryRecords level (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { - offsetIndex().append(largestOffset, physicalPosition); timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); - bytesSinceLastIndexEntry = 0; } - bytesSinceLastIndexEntry += records.sizeInBytes(); + + // append an entry to the offset index at batches level (if needed) + for (RecordBatch batch : records.batches()) { + if (bytesSinceLastIndexEntry > indexIntervalBytes && + batch.lastOffset() >= offsetIndex().lastOffset()) { Review Comment: This is unnecessary. On the leader side, we call `LogValidator.validateBatch`, which makes sure each batch has at least one record. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ########## @@ -257,13 +257,21 @@ public void append(long largestOffset, if (largestTimestampMs > maxTimestampSoFar()) { maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp); } - // append an entry to the index (if needed) + // append an entry to the timestamp index at MemoryRecords level (if needed) if (bytesSinceLastIndexEntry > indexIntervalBytes) { - offsetIndex().append(largestOffset, physicalPosition); timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); - bytesSinceLastIndexEntry = 0; } - bytesSinceLastIndexEntry += records.sizeInBytes(); + + // append an entry to the offset index at batches level (if needed) + for (RecordBatch batch : records.batches()) { + if (bytesSinceLastIndexEntry > indexIntervalBytes && + batch.lastOffset() >= offsetIndex().lastOffset()) { + offsetIndex().append(batch.lastOffset(), physicalPosition); Review Comment: Currently, every time we append an entry to the offset index, we also append an entry to the timestamp index. It would be useful to keep it that way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org