junrao commented on code in PR #18012:
URL: https://github.com/apache/kafka/pull/18012#discussion_r1906137724


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -257,13 +257,21 @@ public void append(long largestOffset,
             if (largestTimestampMs > maxTimestampSoFar()) {
                 maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
             }
-            // append an entry to the index (if needed)
+            // append an entry to the timestamp index at MemoryRecords level 
(if needed)
             if (bytesSinceLastIndexEntry > indexIntervalBytes) {
-                offsetIndex().append(largestOffset, physicalPosition);
                 timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar());
-                bytesSinceLastIndexEntry = 0;
             }
-            bytesSinceLastIndexEntry += records.sizeInBytes();
+
+            // append an entry to the offset index at batches level (if needed)
+            for (RecordBatch batch : records.batches()) {
+                if (bytesSinceLastIndexEntry > indexIntervalBytes &&
+                    batch.lastOffset() >= offsetIndex().lastOffset()) {

Review Comment:
   This is unnecessary. On the leader side, we call 
`LogValidator.validateBatch`, which makes sure each batch has at least one 
record.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -257,13 +257,21 @@ public void append(long largestOffset,
             if (largestTimestampMs > maxTimestampSoFar()) {
                 maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
             }
-            // append an entry to the index (if needed)
+            // append an entry to the timestamp index at MemoryRecords level 
(if needed)
             if (bytesSinceLastIndexEntry > indexIntervalBytes) {
-                offsetIndex().append(largestOffset, physicalPosition);
                 timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar());
-                bytesSinceLastIndexEntry = 0;
             }
-            bytesSinceLastIndexEntry += records.sizeInBytes();
+
+            // append an entry to the offset index at batches level (if needed)
+            for (RecordBatch batch : records.batches()) {
+                if (bytesSinceLastIndexEntry > indexIntervalBytes &&
+                    batch.lastOffset() >= offsetIndex().lastOffset()) {
+                    offsetIndex().append(batch.lastOffset(), physicalPosition);

Review Comment:
   Currently, every time we append an entry to the offset index, we also append 
an entry to the timestamp index. It would be useful to keep it that way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to