mimaison commented on code in PR #21644:
URL: https://github.com/apache/kafka/pull/21644#discussion_r2895781744


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -622,6 +642,988 @@ public void 
testFirstUnstableOffsetWithTransactionalData() throws IOException {
         assertEquals(Optional.empty(), log.firstUnstableOffset());
     }
 
+    @Test
+    public void testHighWatermarkMetadataUpdatedAfterSegmentRoll() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        MemoryRecords records = LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ));
+
+        log.appendAsLeader(records, 0);
+        assertFetchSizeAndOffsets(log, 0L, 0, List.of());
+
+        log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+        assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L, 
1L, 2L));
+
+        log.roll();
+        assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L, 
1L, 2L));
+
+        log.appendAsLeader(records, 0);
+        assertFetchSizeAndOffsets(log, 3L, 0, List.of());
+    }
+
+    private void assertFetchSizeAndOffsets(UnifiedLog log, long fetchOffset, 
int expectedSize, List<Long> expectedOffsets) throws IOException {
+        FetchDataInfo readInfo = log.read(
+                fetchOffset,
+                2048,
+                FetchIsolation.HIGH_WATERMARK,
+                false);
+        assertEquals(expectedSize, readInfo.records.sizeInBytes());
+        List<Long> actualOffsets = new ArrayList<>();
+        readInfo.records.records().forEach(record -> 
actualOffsets.add(record.offset()));
+        assertEquals(expectedOffsets, actualOffsets);
+    }
+
+    @Test
+    public void testAppendAsLeaderWithRaftLeader() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        int leaderEpoch = 0;
+
+        Function<Long, MemoryRecords> records = offset -> 
LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+        log.appendAsLeader(records.apply(0L), leaderEpoch, 
AppendOrigin.RAFT_LEADER);
+        assertEquals(0, log.logStartOffset());
+        assertEquals(3L, log.logEndOffset());
+
+        // Since raft leader is responsible for assigning offsets, and the 
LogValidator is bypassed from the performance perspective,
+        // so the first offset of the MemoryRecords to be appended should 
equal to the next offset in the log
+        assertThrows(UnexpectedAppendOffsetException.class, () -> 
log.appendAsLeader(records.apply(1L), leaderEpoch, AppendOrigin.RAFT_LEADER));
+
+        // When the first offset of the MemoryRecords to be appended equals to 
the next offset in the log, append will succeed
+        log.appendAsLeader(records.apply(3L), leaderEpoch, 
AppendOrigin.RAFT_LEADER);
+        assertEquals(6, log.logEndOffset());
+    }
+
+    @Test
+    public void testAppendInfoFirstOffset() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        List<SimpleRecord> simpleRecords = List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        );
+
+        MemoryRecords records = LogTestUtils.records(simpleRecords);
+
+        LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+        assertEquals(0, firstAppendInfo.firstOffset());
+
+        LogAppendInfo secondAppendInfo = log.appendAsLeader(
+                LogTestUtils.records(simpleRecords),
+                0
+        );
+        assertEquals(simpleRecords.size(), secondAppendInfo.firstOffset());
+
+        log.roll();
+        LogAppendInfo afterRollAppendInfo =  
log.appendAsLeader(LogTestUtils.records(simpleRecords), 0);
+        assertEquals(simpleRecords.size() * 2, 
afterRollAppendInfo.firstOffset());
+    }
+
+    @Test
+    public void testTruncateBelowFirstUnstableOffset() throws IOException {
+        testTruncateBelowFirstUnstableOffset(UnifiedLog::truncateTo);
+    }
+
+    @Test
+    public void testTruncateFullyAndStartBelowFirstUnstableOffset() throws 
IOException {
+        testTruncateBelowFirstUnstableOffset((log, targetOffset) -> 
log.truncateFullyAndStartAt(targetOffset, Optional.empty()));
+    }
+
+    @Test
+    public void testTruncateFullyAndStart() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long producerId = 17L;
+        short producerEpoch = 10;
+        int sequence = 0;
+
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("0".getBytes()),
+                new SimpleRecord("1".getBytes()),
+                new SimpleRecord("2".getBytes())
+        )), 0);
+        log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+                Compression.NONE,
+                producerId,
+                producerEpoch,
+                sequence,
+                new SimpleRecord("3".getBytes()),
+                new SimpleRecord("4".getBytes())
+        ), 0);
+        assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+        // We close and reopen the log to ensure that the first unstable 
offset segment
+        // position will be undefined when we truncate the log.
+        log.close();
+
+        UnifiedLog reopened = createLog(logDir, logConfig);
+        assertEquals(Optional.of(new LogOffsetMetadata(3L)), 
reopened.producerStateManager().firstUnstableOffset());
+
+        reopened.truncateFullyAndStartAt(2L, Optional.of(1L));
+        assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+        assertEquals(Map.of(), 
reopened.producerStateManager().activeProducers());
+        assertEquals(1L, reopened.logStartOffset());
+        assertEquals(2L, reopened.logEndOffset());
+    }
+
+    private void testTruncateBelowFirstUnstableOffset(BiConsumer<UnifiedLog, 
Long> truncateFunc) throws IOException {
+        // Verify that truncation below the first unstable offset correctly
+        // resets the producer state. Specifically we are testing the case when
+        // the segment position of the first unstable offset is unknown.
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long producerId = 17L;
+        short producerEpoch = 10;
+        int sequence = 0;
+
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("0".getBytes()),
+                new SimpleRecord("1".getBytes()),
+                new SimpleRecord("2".getBytes())
+        )), 0);
+        log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+                Compression.NONE,
+                producerId,
+                producerEpoch,
+                sequence,
+                new SimpleRecord("3".getBytes()),
+                new SimpleRecord("4".getBytes())
+        ), 0);
+        assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+        // We close and reopen the log to ensure that the first unstable 
offset segment
+        // position will be undefined when we truncate the log.
+        log.close();
+
+        UnifiedLog reopened = createLog(logDir, logConfig);
+        assertEquals(Optional.of(new LogOffsetMetadata(3L)), 
reopened.producerStateManager().firstUnstableOffset());
+
+        truncateFunc.accept(reopened, 0L);
+        assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+        assertEquals(Map.of(), 
reopened.producerStateManager().activeProducers());
+    }
+
+    @Test
+    public void testHighWatermarkMaintenance() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        int leaderEpoch = 0;
+
+        Function<Long, MemoryRecords> records = offset -> 
LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), "a".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "b".getBytes(), 
"value".getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), "c".getBytes(), 
"value".getBytes())
+        ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, 
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+        // High watermark initialized to 0
+        assertHighWatermark(log, 0L);
+
+        // High watermark not changed by append
+        log.appendAsLeader(records.apply(0L), leaderEpoch);
+        assertHighWatermark(log, 0L);
+
+        // Update high watermark as leader
+        log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
+        assertHighWatermark(log, 1L);
+
+        // Cannot update past the log end offset
+        log.updateHighWatermark(5L);
+        assertHighWatermark(log, 3L);
+
+        // Update high watermark as follower
+        log.appendAsFollower(records.apply(3L), leaderEpoch);
+        log.updateHighWatermark(6L);
+        assertHighWatermark(log, 6L);
+
+        // High watermark should be adjusted by truncation
+        log.truncateTo(3L);
+        assertHighWatermark(log, 3L);
+
+        log.appendAsLeader(records.apply(0L), 0);
+        assertHighWatermark(log, 3L);
+        assertEquals(6L, log.logEndOffset());
+        assertEquals(0L, log.logStartOffset());
+
+        // Full truncation should also reset high watermark
+        log.truncateFullyAndStartAt(4L, Optional.empty());
+        assertEquals(4L, log.logEndOffset());
+        assertEquals(4L, log.logStartOffset());
+        assertHighWatermark(log, 4L);
+    }
+
+    private void assertHighWatermark(UnifiedLog log, long offset) throws 
IOException {
+        assertEquals(offset, log.highWatermark());
+        assertValidLogOffsetMetadata(log, 
log.fetchOffsetSnapshot().highWatermark());
+    }
+
+    private void assertNonEmptyFetch(UnifiedLog log, long offset, 
FetchIsolation isolation, long batchBaseOffset) throws IOException {
+        FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE, 
isolation, true);
+
+        assertFalse(readInfo.firstEntryIncomplete);
+        assertTrue(readInfo.records.sizeInBytes() > 0);
+
+        long upperBoundOffset = switch (isolation) {
+            case LOG_END -> log.logEndOffset();
+            case HIGH_WATERMARK -> log.highWatermark();
+            case TXN_COMMITTED -> log.lastStableOffset();
+        };
+
+        for (Record record : readInfo.records.records())
+            assertTrue(record.offset() < upperBoundOffset);
+
+        assertEquals(batchBaseOffset, 
readInfo.fetchOffsetMetadata.messageOffset);
+        assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+    }
+
+    private void assertEmptyFetch(UnifiedLog log, long offset, FetchIsolation 
isolation, long batchBaseOffset) throws IOException {
+        FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE, 
isolation, true);
+        assertFalse(readInfo.firstEntryIncomplete);
+        assertEquals(0, readInfo.records.sizeInBytes());
+        assertEquals(batchBaseOffset, 
readInfo.fetchOffsetMetadata.messageOffset);
+        assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+    }
+
+    @Test
+    public void testFetchUpToLogEndOffset() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("0".getBytes()),
+                new SimpleRecord("1".getBytes()),
+                new SimpleRecord("2".getBytes())
+        )), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("3".getBytes()),
+                new SimpleRecord("4".getBytes())
+        )), 0);
+        TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+        for (long offset = log.logStartOffset(); offset < log.logEndOffset(); 
offset++) {
+            Long batchBaseOffset = batchBaseOffsets.floor(offset);
+            assertNotNull(batchBaseOffset);
+            assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END, 
batchBaseOffset);
+        }
+    }
+
+    @Test
+    public void testFetchUpToHighWatermark() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("0".getBytes()),
+                new SimpleRecord("1".getBytes()),
+                new SimpleRecord("2".getBytes())
+        )), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(
+                new SimpleRecord("3".getBytes()),
+                new SimpleRecord("4".getBytes())
+        )), 0);
+        TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+        assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+        log.updateHighWatermark(3L);
+        assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+        log.updateHighWatermark(5L);
+        assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+    }
+
+    private void assertHighWatermarkBoundedFetches(UnifiedLog log, 
TreeSet<Long> batchBaseOffsets) throws IOException {
+        for (long offset = log.logStartOffset(); offset < log.highWatermark(); 
offset++) {
+            Long batchBaseOffset = batchBaseOffsets.floor(offset);
+            assertNotNull(batchBaseOffset);
+            assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, 
batchBaseOffset);
+        }
+
+        for (long offset = log.highWatermark(); offset < log.logEndOffset(); 
offset++) {
+            Long batchBaseOffset = batchBaseOffsets.floor(offset);
+            assertNotNull(batchBaseOffset);
+            assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK, 
batchBaseOffset);

Review Comment:
   Very good catch! Thanks
   I've addresses your comments and checked all other `for` loops. I think you 
highlighted all the problematic ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to