m1a2st commented on code in PR #19214: URL: https://github.com/apache/kafka/pull/19214#discussion_r2000915463
########## clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java: ########## @@ -518,6 +523,176 @@ public void testBytesLengthOfWriteTo() throws IOException { verify(channel).transferFrom(any(), anyLong(), eq((long) size - firstWritten)); } + /** + * Test two condition + * 1. If the target offset equals to the base offset of the first batch + * 2. If the target offset < the base offset of the first batch + */ + @ParameterizedTest + @ValueSource(longs = {5, 10}) + public void testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchMatch(long baseOffset) throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch.baseOffset()).thenReturn(baseOffset); + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetWithSize(5L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result); + verify(batch, never()).lastOffset(); + } + + @Test + public void testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchLastOffsetMatch() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch.baseOffset()).thenReturn(3L); + when(batch.lastOffset()).thenReturn(5L); + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetWithSize(5L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result); + // target is equals to the last offset of the batch, we should call lastOffset + verify(batch, times(1)).lastOffset(); + } + + @Test + public void testSearchForOffsetWithSizeLastOffsetCallCountLastBatchLastOffsetMatch() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch prevBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(prevBatch.baseOffset()).thenReturn(5L); + when(prevBatch.lastOffset()).thenReturn(12L); + FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(currentBatch.baseOffset()).thenReturn(15L); + when(currentBatch.lastOffset()).thenReturn(20L); + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, prevBatch, currentBatch); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetWithSize(20L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(currentBatch), result); + // Because the target offset is in the current batch, we should not call lastOffset in the previous batch + verify(prevBatch, never()).lastOffset(); + verify(currentBatch, times(1)).lastOffset(); + } + + @Test + public void testSearchForOffsetWithSizeLastOffsetCallCountPrevBatchMatches() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch prevBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(prevBatch.baseOffset()).thenReturn(5L); + when(prevBatch.lastOffset()).thenReturn(12L); // > targetOffset + FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, prevBatch, currentBatch); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetWithSize(10L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), result); + // Because the target offset is in the current batch, we should call lastOffset + // on the previous batch + verify(prevBatch, times(1)).lastOffset(); + } + + @Test + public void testSearchForOffsetWithSizeLastOffsetCallCountAllBatchesSmallerLastBatchDoesntMatch() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch1.baseOffset()).thenReturn(5L); // < targetOffset + FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch2.baseOffset()).thenReturn(8L); // < targetOffset + when(batch2.lastOffset()).thenReturn(9L); // < targetOffset + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch1, batch2); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetWithSize(10L, 0); + + assertNull(result); + // Because the target offset is exceeded by the last offset of the batch2, + // we should call lastOffset on the batch2 + verify(batch1, never()).lastOffset(); + verify(batch2, times(1)).lastOffset(); + } + + /** + * Test two condition + * 1. If the target offset < the base offset of the last batch + * 2. If the target offset equals to the base offset of the last batch + */ + @ParameterizedTest + @ValueSource(longs = {8, 10}) + public void testSearchForOffsetWithSizeLastOffsetCallCountAllBatchesSmallerLastBatchMatches(long baseOffset) throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch1.baseOffset()).thenReturn(5L); // < targetOffset + FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch2.baseOffset()).thenReturn(baseOffset); // < targetOffset or == targetOffset + when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch1, batch2); + + long targetOffset = 10L; + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetWithSize(targetOffset, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result); + if (targetOffset == baseOffset) { + // Because the target offset is equal to the base offset of the batch2, we should not call + // lastOffset on batch2 + verify(batch1, times(1)).lastOffset(); + verify(batch2, never()).lastOffset(); + } else { + // Because the target offset is in the batch2, we should not call + // lastOffset on batch1 + verify(batch1, never()).lastOffset(); + verify(batch2, times(1)).lastOffset(); + } + } + + @Test + public void testSearchForOffsetWithSizeLastOffsetCallCountTargetBetweenTwoBatches() throws IOException { + File mockFile = mock(File.class); + FileChannel mockChannel = mock(FileChannel.class); + FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch1.baseOffset()).thenReturn(5L); + when(batch1.lastOffset()).thenReturn(10L); + FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class); + when(batch2.baseOffset()).thenReturn(15L); + when(batch2.lastOffset()).thenReturn(20L); + + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + mockFileRecordBatches(fileRecords, batch1, batch2); + + FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetWithSize(13L, 0); + + assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result); + // Because the target offset is between the two batches, we should call lastOffset on the batch1 + verify(batch1, times(1)).lastOffset(); + verify(batch2, never()).lastOffset(); + } + + private void mockFileRecordBatches(FileRecords fileRecords, FileLogInputStream.FileChannelRecordBatch... batch) { + List<FileLogInputStream.FileChannelRecordBatch> batches = new ArrayList<>(asList(batch)); Review Comment: You're right! Thanks for pointing that out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org