clolov commented on code in PR #15060:
URL: https://github.com/apache/kafka/pull/15060#discussion_r1435282790


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2161,6 +2167,86 @@ RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offse
         }
     }
 
+    @Test
+    public void testReadForFirstBatchInLogCompaction() throws 
RemoteStorageException, IOException {
+        FileInputStream fileInputStream = mock(FileInputStream.class);
+        RemoteLogInputStream remoteLogInputStream = 
mock(RemoteLogInputStream.class);
+        ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+        RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+        LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+        when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenAnswer(a -> fileInputStream);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        int fetchOffset = 0;
+        int fetchMaxBytes = 10;
+        int recordBatchSizeInBytes = fetchMaxBytes + 1;
+        RecordBatch firstBatch = mock(RecordBatch.class);
+        ArgumentCaptor<ByteBuffer> capture = 
ArgumentCaptor.forClass(ByteBuffer.class);
+
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, 
Optional.empty()
+        );
+
+        when(rsmManager.fetchLogSegment(any(), 
anyInt())).thenReturn(fileInputStream);
+        when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), 
anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), 
Optional.of(segmentMetadata));

Review Comment:
   Isn't one Optional.of(segmentMetadata) enough?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1412,8 +1422,8 @@ private void 
collectAbortedTransactionInLocalSegments(long startOffset,
             }
         }
     }
-
-    private Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+    // visible for testing.
+    Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
                                                                        
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws 
RemoteStorageException {

Review Comment:
   ```suggestion
                                                                  
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws 
RemoteStorageException {
   ```



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1289,15 +1289,21 @@ public FetchDataInfo read(RemoteStorageFetchInfo 
remoteStorageFetchInfo) throws
         }
 
         RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
-        int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
         InputStream remoteSegInputStream = null;
         try {
-            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
-            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
-            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
-
-            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
-
+            int startPos = 0;
+            RecordBatch firstBatch = null;
+            while (firstBatch == null && rlsMetadataOptional.isPresent()) {

Review Comment:
   I am not super hung-up on this, but don't you just need to look forward once 
i.e. you have a guarantee that if you do not find the offset in this segment 
then you are bound to find it in the next, no? If this is the case can you just 
look in the next segment and not use a while loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to