junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1546966401


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java:
##########
@@ -259,7 +259,7 @@ public String toString() {
                 ", lastOffset=" + lastOffset +
                 ", lastLeaderEpoch=" + lastLeaderEpoch +
                 ", maxTimestamp=" + maxTimestamp +
-                ", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp +
+                ", offsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp +

Review Comment:
   offsetOfMaxTimestamp => shallowOffsetOfMaxTimestamp



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -68,17 +68,20 @@ public static class ValidationResult {
         public final long logAppendTimeMs;
         public final MemoryRecords validatedRecords;
         public final long maxTimestampMs;
-        public final long offsetOfMaxTimestampMs;
+        // we only maintain batch level offset for max timestamp since we want 
to align the behavior of updating time
+        // indexing entries. The paths of follower sync and replica recovery 
do not iterate all records, so they have no

Review Comment:
   follower sync => follower append



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -275,7 +278,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef 
offsetCounter,
 
             if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && 
maxBatchTimestamp > maxTimestamp) {
                 maxTimestamp = maxBatchTimestamp;
-                offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp;
+                shallowOffsetOfMaxTimestamp = offsetOfMaxBatchTimestamp;

Review Comment:
   We need to set shallowOffsetOfMaxTimestamp to the last offset in the batch. 
Do we have test cases covering that?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java:
##########
@@ -117,7 +117,7 @@ public LogAppendInfo(long firstOffset,
         this.lastOffset = lastOffset;
         this.lastLeaderEpoch = lastLeaderEpoch;
         this.maxTimestamp = maxTimestamp;
-        this.offsetOfMaxTimestamp = offsetOfMaxTimestamp;
+        this.shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;

Review Comment:
   Could we rename offsetOfMaxTimestamp in the input param and the javadoc 
accordingly? It would be useful to rename the local val in 
`UnifiedLog.analyzeAndValidateRecords` too.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -293,14 +296,20 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
             maxTimestamp = now;
-            offsetOfMaxTimestamp = initialOffset;
+            if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
+                // case 0: there is only one batch so use the last offset
+                shallowOffsetOfMaxTimestamp = offsetCounter.value - 1;
+            else
+                // case 1: Those single-record batches have same max 
timestamp, so the initial offset is equal with
+                // the last offset of earliest batch
+                shallowOffsetOfMaxTimestamp = initialOffset;

Review Comment:
   For MAGIC_VALUE_V0, shallowOffsetOfMaxTimestamp should be -1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to