junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2047511436


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -605,27 +606,26 @@ private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchIn
                 //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
                 //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
                 //  fetch for multiple remote fetch topic partition in a 
single share fetch request
-                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                topicPartitionRemoteFetchInfoOpt = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult));
                 partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
                 break;
             }
         }
-        return remoteStorageFetchMetadataMap;
+        return topicPartitionRemoteFetchInfoOpt;
     }
 
     private boolean maybeProcessRemoteFetch(
         LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
-        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
-        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
     ) throws Exception {
         topicPartitionData.keySet().forEach(topicIdPartition -> {
             // topic partitions for which fetch would not be happening in this 
share fetch request.
             if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
                 // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
-                releasePartitionLocks(Set.of(topicIdPartition));
+                
releasePartitionLocksAndAddToActionQueue(Set.of(topicIdPartition));

Review Comment:
   It's better to release all the locks first and then add to the action queue.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +583,304 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> 
topicPartitionRemoteFetchInfoOpt = Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                topicPartitionRemoteFetchInfoOpt = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return topicPartitionRemoteFetchInfoOpt;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                
releasePartitionLocksAndAddToActionQueue(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get();
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, 
topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));
+        return Optional.empty();
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        TopicIdPartition topicIdPartition = 
remoteFetchOpt.get().topicIdPartition();
+        try {
+            
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        } catch (KafkaStorageException e) { // Case a
+            log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (UnknownTopicOrPartitionException e) { // Case b
+            log.debug("Broker no longer knows of topicPartition {}, satisfy {} 
immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (NotLeaderOrFollowerException e) { // Case c
+            log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+            canComplete = true;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed/expired.

Review Comment:
   remote fetch in-flight/completed/expired => remote fetch ? Ditto below.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -277,9 +323,15 @@ public boolean tryComplete() {
             return false;
         } catch (Exception e) {
             log.error("Error processing delayed share fetch request", e);
-            releasePartitionLocks(topicPartitionData.keySet());
-            partitionsAcquired.clear();
-            partitionsAlreadyFetched.clear();
+            // In case we have a remote fetch exception, we have already 
released locks for partitions which have potential
+            // local log read. We do not release locks for partitions which 
have a remote storage read because we need to

Review Comment:
   Hmm, this seems not quite right. It's possible that the operation is already 
completed when we get here. In this case, `forceComplete()` won't release the 
lock for the remote fetch.



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() {
         assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
     }
 
+    @Test
+    public void testRemoteStorageFetchTryCompleteReturnsFalse() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertFalse(delayedShareFetch.tryComplete());
+        assertFalse(delayedShareFetch.isCompleted());
+        // Remote fetch object gets created for delayed share fetch object.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for local log read topic partitions 
tp0 and tp1.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchTryCompleteThrowsException() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for sp0 and sp1.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp0 and remote storage read 
result for tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), 
Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Exception will be thrown during the 
creation of remoteFetch object.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), any())).thenThrow(new 
RejectedExecutionException("Exception thrown"));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withExceptionHandler(exceptionHandler)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        // tryComplete returns true and goes to forceComplete once the 
exception occurs.
+        assertTrue(delayedShareFetch.tryComplete());
+        assertTrue(delayedShareFetch.isCompleted());
+        // The future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertFalse(future.isCompletedExceptionally());
+        assertEquals(Set.of(tp1), future.join().keySet());
+        // Exception occurred and was handled.
+        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        // Verify the locks are released for both local and remote read topic 
partitions tp0 and tp1 because of exception occurrence.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        Mockito.verify(delayedShareFetch, times(1)).onComplete();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during 
tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock but the broker becomes unavailable.
+        Future<Void> remoteFetchTask = mock(Future.class);
+        doAnswer(invocation -> {
+            when(remoteFetchTask.isCancelled()).thenReturn(true);
+            return false;
+        }).when(remoteFetchTask).cancel(false);
+
+        when(remoteFetchTask.cancel(false)).thenReturn(true);
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(remoteFetchTask);
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+        
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        
assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled());
+        // Partition locks should be released for all 3 topic partitions
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response contained tp0 and tp1 (local fetch) but not 
tp2, since it errored out.
+        assertEquals(Set.of(tp0, tp1), future.join().keySet());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.empty(),
+            Optional.of(new TimeoutException("Error occurred while creating 
remote fetch result")) // Remote fetch result is returned with an error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());

Review Comment:
   Do we need tp1 in this test? We are not returning its result anyway.



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() {
         assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
     }
 
+    @Test
+    public void testRemoteStorageFetchTryCompleteReturnsFalse() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertFalse(delayedShareFetch.tryComplete());
+        assertFalse(delayedShareFetch.isCompleted());
+        // Remote fetch object gets created for delayed share fetch object.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for local log read topic partitions 
tp0 and tp1.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchTryCompleteThrowsException() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for sp0 and sp1.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp0 and remote storage read 
result for tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), 
Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Exception will be thrown during the 
creation of remoteFetch object.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), any())).thenThrow(new 
RejectedExecutionException("Exception thrown"));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withExceptionHandler(exceptionHandler)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        // tryComplete returns true and goes to forceComplete once the 
exception occurs.
+        assertTrue(delayedShareFetch.tryComplete());
+        assertTrue(delayedShareFetch.isCompleted());
+        // The future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertFalse(future.isCompletedExceptionally());
+        assertEquals(Set.of(tp1), future.join().keySet());
+        // Exception occurred and was handled.
+        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        // Verify the locks are released for both local and remote read topic 
partitions tp0 and tp1 because of exception occurrence.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        Mockito.verify(delayedShareFetch, times(1)).onComplete();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during 
tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock but the broker becomes unavailable.
+        Future<Void> remoteFetchTask = mock(Future.class);
+        doAnswer(invocation -> {
+            when(remoteFetchTask.isCancelled()).thenReturn(true);
+            return false;
+        }).when(remoteFetchTask).cancel(false);
+
+        when(remoteFetchTask.cancel(false)).thenReturn(true);
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(remoteFetchTask);
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+        
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        
assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled());
+        // Partition locks should be released for all 3 topic partitions
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response contained tp0 and tp1 (local fetch) but not 
tp2, since it errored out.
+        assertEquals(Set.of(tp0, tp1), future.join().keySet());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.empty(),
+            Optional.of(new TimeoutException("Error occurred while creating 
remote fetch result")) // Remote fetch result is returned with an error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() 
{
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 3));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+        SharePartition sp3 = mock(SharePartition.class);
+
+        // Except tp3, all the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp3.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+        sharePartitions.put(tp3, sp3);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset does not match with the cached entry for sp0, sp1 and 
sp2. Hence, a replica manager fetch will happen for all of them in tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp0, tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withReplicaManager(replicaManager)
+            .withSharePartitions(sharePartitions)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // the future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0, tp1, tp2), future.join().keySet());
+        // Verify the locks are released for both local log and remote storage 
read topic partitions tp0, tp1 and tp2.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertEquals(Errors.NONE.code(), future.join().get(tp1).errorCode());
+        assertEquals(Errors.NONE.code(), future.join().get(tp2).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 and sp1 are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for both.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        LinkedHashSet<TopicIdPartition> remoteStorageFetchPartitions = new 
LinkedHashSet<>();
+        remoteStorageFetchPartitions.add(tp0);
+        remoteStorageFetchPartitions.add(tp1);
+
+        // Mocking remote storage read result for tp0 and tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
remoteStorageFetchPartitions)).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released separately for tp1 (from tryComplete).
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        // From onComplete, the locks will be released for both tp0 and tp1. 
tp0 because it was acquired from
+        // tryComplete and has remote fetch processed. tp1 will be reacquired 
in onComplete when we check for local log read.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response only contains the first remote storage fetch 
topic partition - tp0.

Review Comment:
   Is that accurate? It seems that we mock `sp1.acquire` too.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     * @param replicaManagerReadResponse - The replica manager read response 
containing log read results for acquired topic partitions
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
+        LogReadResult logReadResult = 
replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));

Review Comment:
   Could we at least set both instance variables in the same method? Otherwise, 
it's a bit hard to track where the internal states are changed.



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1155,6 +1170,537 @@ public void testOnCompleteExecutionOnTimeout() {
         assertEquals(1, delayedShareFetch.expiredRequestMeter().count());
     }
 
+    @Test
+    public void testRemoteStorageFetchTryCompleteReturnsFalse() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertFalse(delayedShareFetch.tryComplete());
+        assertFalse(delayedShareFetch.isCompleted());
+        // Remote fetch object gets created for delayed share fetch object.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for local log read topic partitions 
tp0 and tp1.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchTryCompleteThrowsException() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+
+        // Fetch offset does not match with the cached entry for sp0 and sp1. 
Hence, a replica manager fetch will happen for sp0 and sp1.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking local log read result for tp0 and remote storage read 
result for tp1.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), 
Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Exception will be thrown during the 
creation of remoteFetch object.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), any())).thenThrow(new 
RejectedExecutionException("Exception thrown"));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withExceptionHandler(exceptionHandler)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        // tryComplete returns true and goes to forceComplete once the 
exception occurs.
+        assertTrue(delayedShareFetch.tryComplete());
+        assertTrue(delayedShareFetch.isCompleted());
+        // The future of shareFetch completes.
+        assertTrue(shareFetch.isCompleted());
+        assertFalse(future.isCompletedExceptionally());
+        assertEquals(Set.of(tp1), future.join().keySet());
+        // Exception occurred and was handled.
+        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        // Verify the locks are released for both local and remote read topic 
partitions tp0 and tp1 because of exception occurrence.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp1));
+        Mockito.verify(delayedShareFetch, times(1)).onComplete();
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset matches with the cached entry for sp0 but not for sp1 
and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during 
tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(10, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), 
Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())
+        ).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock but the broker becomes unavailable.
+        Future<Void> remoteFetchTask = mock(Future.class);
+        doAnswer(invocation -> {
+            when(remoteFetchTask.isCancelled()).thenReturn(true);
+            return false;
+        }).when(remoteFetchTask).cancel(false);
+
+        when(remoteFetchTask.cancel(false)).thenReturn(true);
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(remoteFetchTask);
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+        
when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1, tp2)))
+            .build());
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        
assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled());
+        // Partition locks should be released for all 3 topic partitions
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2));
+        assertTrue(shareFetch.isCompleted());
+        // Share fetch response contained tp0 and tp1 (local fetch) but not 
tp2, since it errored out.
+        assertEquals(Set.of(tp0, tp1), future.join().keySet());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.empty(),
+            Optional.of(new TimeoutException("Error occurred while creating 
remote fetch result")) // Remote fetch result is returned with an error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        // sp0 is acquirable, sp1 is not acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        // Fetch offset does not match with the cached entry for sp0. Hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object completes within 
tryComplete in this mock, hence request will move on to forceComplete.
+        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(
+            Optional.of(REMOTE_FETCH_INFO),
+            Optional.empty() // Remote fetch result is returned successfully 
without error.
+        );
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        doAnswer(invocationOnMock -> {
+            // Make sure that the callback is called to populate 
remoteFetchResult for the mock behaviour.
+            Consumer<RemoteLogReadResult> callback = 
invocationOnMock.getArgument(1);
+            callback.accept(remoteFetchResult);
+            return CompletableFuture.completedFuture(remoteFetchResult);
+        }).when(remoteLogManager).asyncRead(any(), any());
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
+            .build());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        assertFalse(delayedShareFetch.isCompleted());
+        assertTrue(delayedShareFetch.tryComplete());
+
+        assertTrue(delayedShareFetch.isCompleted());
+        // Pending remote fetch object gets created for delayed share fetch.
+        assertNotNull(delayedShareFetch.remoteFetch());
+        // Verify the locks are released for tp0.
+        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+        assertTrue(shareFetch.isCompleted());
+        assertEquals(Set.of(tp0), future.join().keySet());
+        assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode());
+        assertTrue(delayedShareFetch.lock().tryLock());
+        delayedShareFetch.lock().unlock();
+    }
+
+    @Test
+    public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() 
{
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 2));
+        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 3));
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+        SharePartition sp3 = mock(SharePartition.class);
+
+        // Except tp3, all the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp3.maybeAcquireFetchLock()).thenReturn(false);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+        sharePartitions.put(tp2, sp2);
+        sharePartitions.put(tp3, sp3);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            future, List.of(tp0, tp1, tp2, tp3), BATCH_SIZE, MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+        when(sp1.nextFetchOffset()).thenReturn(20L);
+        when(sp2.nextFetchOffset()).thenReturn(30L);
+
+        // Fetch offset does not match with the cached entry for sp0, sp1 and 
sp2. Hence, a replica manager fetch will happen for all of them in tryComplete.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+        when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
+            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // Mocking local log read result for tp0, tp1 and remote storage read 
result for tp2 on first replicaManager readFromLog call(from tryComplete).
+        // Mocking local log read result for tp0 and tp1 on second 
replicaManager readFromLog call(from onComplete).
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of(tp2))
+        ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, 
tp1), Set.of())

Review Comment:
   Do we need tp3 in this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to