adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2048256373


##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() {
         assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
     }
 
+    @Test
+    public void testRemoteStorageFetchTryCompleteReturnsFalse() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertFalse(delayedShareFetch.tryComplete());
+        assertFalse(delayedShareFetch.isCompleted());
+        // Remote fetch object gets created for delayed share fetch object.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for local log read topic partitions 
tp0 and tp1.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchTryCompleteThrowsException() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for sp0 and sp1.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp0 and remote storage read 
result for tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), 
Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Exception will be thrown during the 
creation of remoteFetch object.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), any())).thenThrow(new 
RejectedExecutionException("Exception thrown"));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withExceptionHandler(exceptionHandler)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        // tryComplete returns true and goes to forceComplete once the 
exception occurs.
+        assertTrue(delayedShareFetch.tryComplete());
+        assertTrue(delayedShareFetch.isCompleted());
+        // The future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertFalse(future.isCompletedExceptionally());
+        assertEquals(Set.of(tp1), future.join().keySet());
+        // Exception occurred and was handled.
+        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        // Verify the locks are released for both local and remote read topic 
partitions tp0 and tp1 because of exception occurrence.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        Mockito.verify(delayedShareFetch, times(1)).onComplete();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during 
tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock but the broker becomes unavailable.
+        Future<Void> remoteFetchTask = mock(Future.class);
+        doAnswer(invocation -> {
+            when(remoteFetchTask.isCancelled()).thenReturn(true);
+            return false;
+        }).when(remoteFetchTask).cancel(false);
+
+        when(remoteFetchTask.cancel(false)).thenReturn(true);
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(remoteFetchTask);
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+        
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        
assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled());
+        // Partition locks should be released for all 3 topic partitions
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response contained tp0 and tp1 (local fetch) but not 
tp2, since it errored out.
+        assertEquals(Set.of(tp0, tp1), future.join().keySet());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.empty(),
+            Optional.of(new TimeoutException("Error occurred while creating 
remote fetch result")) // Remote fetch result is returned with an error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() 
{
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 3));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+        SharePartition sp3 = mock(SharePartition.class);
+
+        // Except tp3, all the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp3.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+        sharePartitions.put(tp3, sp3);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset does not match with the cached entry for sp0, sp1 and 
sp2. Hence, a replica manager fetch will happen for all of them in tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp0, tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())

Review Comment:
   Same reasoning as above, as per mocked data, since we are not able to 
acquire the fetch lock for tp3, usage of tp3 is pretty much redundant. Makes 
sense, I have removed it.



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() {
         assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
     }
 
+    @Test
+    public void testRemoteStorageFetchTryCompleteReturnsFalse() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertFalse(delayedShareFetch.tryComplete());
+        assertFalse(delayedShareFetch.isCompleted());
+        // Remote fetch object gets created for delayed share fetch object.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for local log read topic partitions 
tp0 and tp1.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchTryCompleteThrowsException() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for sp0 and sp1.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp0 and remote storage read 
result for tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), 
Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Exception will be thrown during the 
creation of remoteFetch object.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), any())).thenThrow(new 
RejectedExecutionException("Exception thrown"));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withExceptionHandler(exceptionHandler)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        // tryComplete returns true and goes to forceComplete once the 
exception occurs.
+        assertTrue(delayedShareFetch.tryComplete());
+        assertTrue(delayedShareFetch.isCompleted());
+        // The future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertFalse(future.isCompletedExceptionally());
+        assertEquals(Set.of(tp1), future.join().keySet());
+        // Exception occurred and was handled.
+        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        // Verify the locks are released for both local and remote read topic 
partitions tp0 and tp1 because of exception occurrence.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        Mockito.verify(delayedShareFetch, times(1)).onComplete();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during 
tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock but the broker becomes unavailable.
+        Future<Void> remoteFetchTask = mock(Future.class);
+        doAnswer(invocation -> {
+            when(remoteFetchTask.isCancelled()).thenReturn(true);
+            return false;
+        }).when(remoteFetchTask).cancel(false);
+
+        when(remoteFetchTask.cancel(false)).thenReturn(true);
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(remoteFetchTask);
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+        
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        
assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled());
+        // Partition locks should be released for all 3 topic partitions
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response contained tp0 and tp1 (local fetch) but not 
tp2, since it errored out.
+        assertEquals(Set.of(tp0, tp1), future.join().keySet());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.empty(),
+            Optional.of(new TimeoutException("Error occurred while creating 
remote fetch result")) // Remote fetch result is returned with an error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() 
{
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 3));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+        SharePartition sp3 = mock(SharePartition.class);
+
+        // Except tp3, all the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp3.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+        sharePartitions.put(tp3, sp3);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset does not match with the cached entry for sp0, sp1 and 
sp2. Hence, a replica manager fetch will happen for all of them in tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp0, tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withReplicaManager(replicaManager)
+            .withSharePartitions(sharePartitions)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // the future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0, tp1, tp2), future.join().keySet());
+        // Verify the locks are released for both local log and remote storage 
read topic partitions tp0, tp1 and tp2.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertEquals(Errors.NONE.code(), future.join().get(tp1).errorCode());
+        assertEquals(Errors.NONE.code(), future.join().get(tp2).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 and sp1 are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for both.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        LinkedHashSet<TopicIdPartition> remoteStorageFetchPartitions = new 
LinkedHashSet<>();
+        remoteStorageFetchPartitions.add(tp0);
+        remoteStorageFetchPartitions.add(tp1);
+
+        // Mocking remote storage read result for tp0 and tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
remoteStorageFetchPartitions)).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released separately for tp1 (from tryComplete).
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        // From onComplete, the locks will be released for both tp0 and tp1. 
tp0 because it was acquired from
+        // tryComplete and has remote fetch processed. tp1 will be reacquired 
in onComplete when we check for local log read.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response only contains the first remote storage fetch 
topic partition - tp0.

Review Comment:
   Mocking `sp1.acquire` is unnecessary. The test works correctly without 
mocking. Hence, I have removed it. Thanks for pointing it out. The test in 
itself is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to