junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1546966401
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java: ########## @@ -259,7 +259,7 @@ public String toString() { ", lastOffset=" + lastOffset + ", lastLeaderEpoch=" + lastLeaderEpoch + ", maxTimestamp=" + maxTimestamp + - ", offsetOfMaxTimestamp=" + offsetOfMaxTimestamp + + ", offsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp + Review Comment: offsetOfMaxTimestamp => shallowOffsetOfMaxTimestamp ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ########## @@ -68,17 +68,20 @@ public static class ValidationResult { public final long logAppendTimeMs; public final MemoryRecords validatedRecords; public final long maxTimestampMs; - public final long offsetOfMaxTimestampMs; + // we only maintain batch level offset for max timestamp since we want to align the behavior of updating time + // indexing entries. The paths of follower sync and replica recovery do not iterate all records, so they have no Review Comment: follower sync => follower append ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ########## @@ -275,7 +278,7 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (batch.magic() > RecordBatch.MAGIC_VALUE_V0 && maxBatchTimestamp > maxTimestamp) { maxTimestamp = maxBatchTimestamp; - offsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; + shallowOffsetOfMaxTimestamp = offsetOfMaxBatchTimestamp; Review Comment: We need to set shallowOffsetOfMaxTimestamp to the last offset in the batch. Do we have test cases covering that? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java: ########## @@ -117,7 +117,7 @@ public LogAppendInfo(long firstOffset, this.lastOffset = lastOffset; this.lastLeaderEpoch = lastLeaderEpoch; this.maxTimestamp = maxTimestamp; - this.offsetOfMaxTimestamp = offsetOfMaxTimestamp; + this.shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; Review Comment: Could we rename offsetOfMaxTimestamp in the input param and the javadoc accordingly? It would be useful to rename the local val in `UnifiedLog.analyzeAndValidateRecords` too. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ########## @@ -293,14 +296,20 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; - offsetOfMaxTimestamp = initialOffset; + if (toMagic >= RecordBatch.MAGIC_VALUE_V2) + // case 0: there is only one batch so use the last offset + shallowOffsetOfMaxTimestamp = offsetCounter.value - 1; + else + // case 1: Those single-record batches have same max timestamp, so the initial offset is equal with + // the last offset of earliest batch + shallowOffsetOfMaxTimestamp = initialOffset; Review Comment: For MAGIC_VALUE_V0, shallowOffsetOfMaxTimestamp should be -1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org