junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2042492747


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -110,37 +126,45 @@ public DelayedShareFetch(
             sharePartitions,
             
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM),
             shareGroupMetrics,
-            time
+            time,
+            Optional.empty()
         );
     }
 
+    // The direct usage of this constructor is only from tests.

Review Comment:
   Could we move the javadoc here and add the description for the new param 
`remoteFetchOpt`?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        // TODO: There is a limitation in remote storage fetch for consumer 
groups that we can only perform remote fetch for
+        //  a single topic partition in a fetch request. Since, the logic of 
fetch is largely based on how consumer groups work,
+        //  we are following the same logic. However, this problem should be 
addressed as part of KAFKA-19133 which should help us perform
+        //  fetch for multiple remote fetch topic partition in a single share 
fetch request
+        TopicIdPartition remoteFetchTopicIdPartition = 
getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
+
+        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> 
fetchOffsetMetadataMap = new LinkedHashMap<>();
+        remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> 
fetchOffsetMetadataMap.put(
+            topicIdPartition,
+            
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
+        ));
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, 
remoteStorageFetchInfo, fetchOffsetMetadataMap));
+        return Optional.empty();
+    }
+
+    /**
+     * This function returns the first topic partition for which we need to 
perform remote storage fetch. We remove all the
+     * other partitions that can have a remote storage fetch for further 
processing and release the fetch locks for them.
+     * @param remoteStorageFetchInfoMap map containing topic partition to 
remote storage fetch information.
+     * @return the first topic partition for which we need to perform remote 
storage fetch
+     */
+    private TopicIdPartition 
getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, 
RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
+        Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> 
firstRemoteStorageFetchInfo = 
remoteStorageFetchInfoMap.entrySet().iterator().next();
+        TopicIdPartition remoteFetchTopicIdPartition = 
firstRemoteStorageFetchInfo.getKey();
+        remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
+            if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
+                partitionsAcquired.remove(topicIdPartition);
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        return remoteFetchTopicIdPartition;
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : 
remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
+            try {
+                if (fetchOffsetMetadata != 
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+                    
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                }
+            } catch (KafkaStorageException e) { // Case a
+                log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+                canComplete = true;
+            } catch (UnknownTopicOrPartitionException e) { // Case b
+                log.debug("Broker no longer knows of topicPartition {}, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+                canComplete = true;
+            } catch (NotLeaderOrFollowerException e) { // Case c
+                log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+                canComplete = true;
+            }
+            if (canComplete)
+                break;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed.
+     */
+    private void completeRemoteStorageShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData 
= new LinkedHashMap<>();
+        try {
+            List<ShareFetchPartitionData> shareFetchPartitionData = new 
ArrayList<>();
+            int readableBytes = 0;
+            if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+                RemoteFetch remoteFetch = remoteFetchOpt.get();
+                if (remoteFetch.remoteFetchResult().get().error.isPresent()) {

Review Comment:
   Could we just call `remoteFetch.remoteFetchResult().get()` once and reuse 
the result?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);

Review Comment:
   Could we just release the lock in the loop above and avoid using 
localFetchTopicPartitions?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -110,37 +126,45 @@ public DelayedShareFetch(
             sharePartitions,
             
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM),
             shareGroupMetrics,
-            time
+            time,
+            Optional.empty()
         );
     }
 
+    // The direct usage of this constructor is only from tests.
     DelayedShareFetch(
         ShareFetch shareFetch,
         ReplicaManager replicaManager,
         BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
         PartitionMaxBytesStrategy partitionMaxBytesStrategy,
         ShareGroupMetrics shareGroupMetrics,
-        Time time
+        Time time,
+        Optional<RemoteFetch> remoteFetchOpt
     ) {
         super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
         this.shareFetch = shareFetch;
         this.replicaManager = replicaManager;
         this.partitionsAcquired = new LinkedHashMap<>();
-        this.partitionsAlreadyFetched = new LinkedHashMap<>();
+        this.localPartitionsAlreadyFetched = new LinkedHashMap<>();
         this.exceptionHandler = exceptionHandler;
         this.sharePartitions = sharePartitions;
         this.partitionMaxBytesStrategy = partitionMaxBytesStrategy;
         this.shareGroupMetrics = shareGroupMetrics;
         this.time = time;
         this.acquireStartTimeMs = time.hiResClockMs();
+        this.remoteFetchOpt = remoteFetchOpt;
+        this.remoteStorageFetchException = Optional.empty();
         // Register metrics for DelayedShareFetch.
         KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", 
"DelayedShareFetchMetrics");
         this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, 
"requests", TimeUnit.SECONDS);
     }
 
     @Override
     public void onExpiration() {
+        if (remoteFetchOpt.isPresent()) {
+            cancelRemoteFetchTask();

Review Comment:
   This is unnecessary. `onExpiration()` is called after `onComplete()`. When 
we get here, the remote fetch would have been cancelled already if needed.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -152,58 +176,68 @@ public void onExpiration() {
     @Override
     public void onComplete() {
         // We are utilizing lock so that onComplete doesn't do a dirty read 
for instance variables -
-        // partitionsAcquired and partitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
+        // partitionsAcquired and localPartitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
         lock.lock();
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
             + "topic partitions {}", shareFetch.groupId(), 
shareFetch.memberId(),
             partitionsAcquired.keySet());
 
         try {
-            LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
-            // tryComplete did not invoke forceComplete, so we need to check 
if we have any partitions to fetch.
-            if (partitionsAcquired.isEmpty()) {
-                topicPartitionData = acquirablePartitions();
-                // The TopicPartitionsAcquireTimeMs metric signifies the 
tension when acquiring the locks
-                // for the share partition, hence if no partitions are yet 
acquired by tryComplete,
-                // we record the metric here. Do not check if the request has 
successfully acquired any
-                // partitions now or not, as then the upper bound of request 
timeout shall be recorded
-                // for the metric.
-                updateAcquireElapsedTimeMetric();
+            if (remoteStorageFetchException.isPresent()) {
+                completeErroneousRemoteShareFetchRequest();
+            } else if (remoteFetchOpt.isPresent()) {
+                completeRemoteStorageShareFetchRequest();
             } else {
-                // tryComplete invoked forceComplete, so we can use the data 
from tryComplete.
-                topicPartitionData = partitionsAcquired;
+                completeLocalLogShareFetchRequest();
             }
-
-            if (topicPartitionData.isEmpty()) {
-                // No locks for share partitions could be acquired, so we 
complete the request with an empty response.
-                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
-                shareFetch.maybeComplete(Map.of());
-                return;
-            } else {
-                // Update metric to record acquired to requested partitions.
-                double requestTopicToAcquired = (double) 
topicPartitionData.size() / shareFetch.topicIdPartitions().size();
-                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(requestTopicToAcquired * 100));
-            }
-            log.trace("Fetchable share partitions data: {} with groupId: {} 
fetch params: {}",
-                topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
-
-            completeShareFetchRequest(topicPartitionData);
         } finally {
             lock.unlock();
         }
     }
 
-    private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, 
Long> topicPartitionData) {
+    private void completeLocalLogShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
+        // tryComplete did not invoke forceComplete, so we need to check if we 
have any partitions to fetch.
+        if (partitionsAcquired.isEmpty()) {
+            topicPartitionData = acquirablePartitions(sharePartitions);
+            // The TopicPartitionsAcquireTimeMs metric signifies the tension 
when acquiring the locks
+            // for the share partition, hence if no partitions are yet 
acquired by tryComplete,
+            // we record the metric here. Do not check if the request has 
successfully acquired any
+            // partitions now or not, as then the upper bound of request 
timeout shall be recorded
+            // for the metric.
+            updateAcquireElapsedTimeMetric();
+        } else {
+            // tryComplete invoked forceComplete, so we can use the data from 
tryComplete.
+            topicPartitionData = partitionsAcquired;
+        }
+
+        if (topicPartitionData.isEmpty()) {
+            // No locks for share partitions could be acquired, so we complete 
the request with an empty response.
+            
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
+            shareFetch.maybeComplete(Map.of());
+            return;
+        } else {
+            // Update metric to record acquired to requested partitions.
+            double requestTopicToAcquired = (double) topicPartitionData.size() 
/ shareFetch.topicIdPartitions().size();
+            
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(requestTopicToAcquired * 100));
+        }
+        log.trace("Fetchable share partitions data: {} with groupId: {} fetch 
params: {}",
+            topicPartitionData, shareFetch.groupId(), 
shareFetch.fetchParams());
+
+        processAcquiredTopicPartitions(topicPartitionData);
+    }
+
+    private void 
processAcquiredTopicPartitions(LinkedHashMap<TopicIdPartition, Long> 
topicPartitionData) {

Review Comment:
   Since there is only 1 caller, perhaps we could just fold it into the caller? 
Otherwise, we probably need to name this method clearer to indicate that it's 
only used for local logs.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);

Review Comment:
   It's a bit weird to populate `localPartitionsAlreadyFetched` here. Unlike 
fetching from local partitions, we only use the returned metadata, but not the 
fetched records. Perhaps we could just fold the metadata into RemoteFetch.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());

Review Comment:
   Since we only support 1 remote partition, would it be simpler if we just 
pass a singleton around instead of a map?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        // TODO: There is a limitation in remote storage fetch for consumer 
groups that we can only perform remote fetch for
+        //  a single topic partition in a fetch request. Since, the logic of 
fetch is largely based on how consumer groups work,
+        //  we are following the same logic. However, this problem should be 
addressed as part of KAFKA-19133 which should help us perform
+        //  fetch for multiple remote fetch topic partition in a single share 
fetch request
+        TopicIdPartition remoteFetchTopicIdPartition = 
getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
+
+        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> 
fetchOffsetMetadataMap = new LinkedHashMap<>();
+        remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> 
fetchOffsetMetadataMap.put(
+            topicIdPartition,
+            
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
+        ));
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, 
remoteStorageFetchInfo, fetchOffsetMetadataMap));
+        return Optional.empty();
+    }
+
+    /**
+     * This function returns the first topic partition for which we need to 
perform remote storage fetch. We remove all the
+     * other partitions that can have a remote storage fetch for further 
processing and release the fetch locks for them.
+     * @param remoteStorageFetchInfoMap map containing topic partition to 
remote storage fetch information.
+     * @return the first topic partition for which we need to perform remote 
storage fetch
+     */
+    private TopicIdPartition 
getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, 
RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {

Review Comment:
   We don't use getters. So this can just be `remoteFetchTopicIdPartition`.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse

Review Comment:
   Could we add the javadoc?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        // TODO: There is a limitation in remote storage fetch for consumer 
groups that we can only perform remote fetch for
+        //  a single topic partition in a fetch request. Since, the logic of 
fetch is largely based on how consumer groups work,
+        //  we are following the same logic. However, this problem should be 
addressed as part of KAFKA-19133 which should help us perform
+        //  fetch for multiple remote fetch topic partition in a single share 
fetch request
+        TopicIdPartition remoteFetchTopicIdPartition = 
getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
+
+        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> 
fetchOffsetMetadataMap = new LinkedHashMap<>();
+        remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> 
fetchOffsetMetadataMap.put(
+            topicIdPartition,
+            
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
+        ));
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, 
remoteStorageFetchInfo, fetchOffsetMetadataMap));
+        return Optional.empty();
+    }
+
+    /**
+     * This function returns the first topic partition for which we need to 
perform remote storage fetch. We remove all the
+     * other partitions that can have a remote storage fetch for further 
processing and release the fetch locks for them.
+     * @param remoteStorageFetchInfoMap map containing topic partition to 
remote storage fetch information.
+     * @return the first topic partition for which we need to perform remote 
storage fetch
+     */
+    private TopicIdPartition 
getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, 
RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
+        Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> 
firstRemoteStorageFetchInfo = 
remoteStorageFetchInfoMap.entrySet().iterator().next();
+        TopicIdPartition remoteFetchTopicIdPartition = 
firstRemoteStorageFetchInfo.getKey();
+        remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
+            if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
+                partitionsAcquired.remove(topicIdPartition);
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        return remoteFetchTopicIdPartition;
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : 
remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
+            try {
+                if (fetchOffsetMetadata != 
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+                    
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                }
+            } catch (KafkaStorageException e) { // Case a
+                log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+                canComplete = true;
+            } catch (UnknownTopicOrPartitionException e) { // Case b
+                log.debug("Broker no longer knows of topicPartition {}, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+                canComplete = true;
+            } catch (NotLeaderOrFollowerException e) { // Case c
+                log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+                canComplete = true;
+            }
+            if (canComplete)
+                break;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed.
+     */
+    private void completeRemoteStorageShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData 
= new LinkedHashMap<>();
+        try {
+            List<ShareFetchPartitionData> shareFetchPartitionData = new 
ArrayList<>();
+            int readableBytes = 0;
+            if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+                RemoteFetch remoteFetch = remoteFetchOpt.get();
+                if (remoteFetch.remoteFetchResult().get().error.isPresent()) {
+                    Throwable error = 
remoteFetch.remoteFetchResult().get().error.get();
+                    // If there is any error for the remote fetch topic 
partition, we populate the error accordingly.
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            remoteFetch.topicIdPartition(),
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+                        )
+                    );
+                } else {
+                    FetchDataInfo info = 
remoteFetch.remoteFetchResult().get().fetchDataInfo.get();
+                    TopicIdPartition topicIdPartition = 
remoteFetch.topicIdPartition();
+                    LogReadResult logReadResult = 
localPartitionsAlreadyFetched.get(topicIdPartition);
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            topicIdPartition,
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            new FetchPartitionData(
+                                logReadResult.error(),
+                                logReadResult.highWatermark(),
+                                logReadResult.leaderLogStartOffset(),
+                                info.records,
+                                Optional.empty(),
+                                logReadResult.lastStableOffset().isDefined() ? 
OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : 
OptionalLong.empty(),
+                                info.abortedTransactions,
+                                
logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) 
logReadResult.preferredReadReplica().get()) : OptionalInt.empty(),
+                                false
+                            )
+                        )
+                    );
+                    readableBytes += info.records.sizeInBytes();
+                }
+            } else {
+                cancelRemoteFetchTask();
+            }
+
+            // If remote fetch bytes  < shareFetch.fetchParams().maxBytes, 
then we will try for a local read.
+            if (readableBytes < shareFetch.fetchParams().maxBytes) {
+                // Get the local log read based topic partitions.
+                LinkedHashMap<TopicIdPartition, SharePartition> 
nonRemoteFetchSharePartitions = new LinkedHashMap<>();
+                sharePartitions.forEach((topicIdPartition, sharePartition) -> {
+                    if (!partitionsAcquired.containsKey(topicIdPartition) && 
!remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) {
+                        nonRemoteFetchSharePartitions.put(topicIdPartition, 
sharePartition);
+                    }
+                });
+                nonRemoteFetchTopicPartitionData = 
acquirablePartitions(nonRemoteFetchSharePartitions);
+                if (!nonRemoteFetchTopicPartitionData.isEmpty()) {
+                    log.trace("Fetchable local share partitions for a remote 
share fetch request data: {} with groupId: {} fetch params: {}",
+                        nonRemoteFetchTopicPartitionData, 
shareFetch.groupId(), shareFetch.fetchParams());
+
+                    LinkedHashMap<TopicIdPartition, LogReadResult> 
responseData = readFromLog(
+                        nonRemoteFetchTopicPartitionData,
+                        
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - 
readableBytes, nonRemoteFetchTopicPartitionData.keySet(), 
nonRemoteFetchTopicPartitionData.size()));
+                    for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
responseData.entrySet()) {
+                        if 
(entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
+                            shareFetchPartitionData.add(
+                                new ShareFetchPartitionData(
+                                    entry.getKey(),
+                                    
nonRemoteFetchTopicPartitionData.get(entry.getKey()),
+                                    
entry.getValue().toFetchPartitionData(false)
+                                )
+                            );
+                        }
+                    }
+                }
+            }
+
+            // Update metric to record acquired to requested partitions.
+            double requestTopicToAcquired = (double) 
(partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) / 
shareFetch.topicIdPartitions().size();
+            if (requestTopicToAcquired > 0)
+                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(requestTopicToAcquired * 100));
+
+            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
remoteFetchResponse = ShareFetchUtils.processFetchResponse(
+                shareFetch, shareFetchPartitionData, sharePartitions, 
replicaManager, exceptionHandler);
+            shareFetch.maybeComplete(remoteFetchResponse);
+            log.trace("Remote share fetch request completed successfully, 
response: {}", remoteFetchResponse);
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            log.error("Error processing delayed share fetch request", e);
+            Set<TopicIdPartition> topicIdPartitions = new 
LinkedHashSet<>(partitionsAcquired.keySet());
+            
topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet());
+            handleFetchException(shareFetch, topicIdPartitions, e);
+        } finally {
+            Set<TopicIdPartition> topicIdPartitions = new 
LinkedHashSet<>(partitionsAcquired.keySet());
+            
topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet());
+            releasePartitionLocksAndAddToActionQueue(topicIdPartitions);
+        }
+    }
+
+    /**
+     * Cancel the remote storage read task, if it has not been executed yet 
and avoid interrupting the task if it is
+     * already running as it may force closing opened/cached resources as 
transaction index.
+     * Note - This function should only be called when we know that there is a 
remote fetch in-flight/completed.
+     */
+    private void cancelRemoteFetchTask() {
+        boolean cancelled = 
remoteFetchOpt.get().remoteFetchTask().cancel(false);
+        if (!cancelled) {
+            log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could 
not be cancelled and its isDone value is {}",
+                remoteFetchOpt.get().remoteFetchInfo(), 
remoteFetchOpt.get().remoteFetchTask().isDone());
+        }
+    }
+
+    public record RemoteFetch(
+        TopicIdPartition topicIdPartition,
+        Future<Void> remoteFetchTask,
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult,
+        RemoteStorageFetchInfo remoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> 
fetchOffsetMetadataMap

Review Comment:
   Since there is only 1 partition, we don't need a map here, right?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();

Review Comment:
   Do we need this? We already registered the purgatory check in the callback 
in `replicaManager.remoteLogManager().get().asyncRead` when the remote fetch 
completes.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        // TODO: There is a limitation in remote storage fetch for consumer 
groups that we can only perform remote fetch for
+        //  a single topic partition in a fetch request. Since, the logic of 
fetch is largely based on how consumer groups work,
+        //  we are following the same logic. However, this problem should be 
addressed as part of KAFKA-19133 which should help us perform
+        //  fetch for multiple remote fetch topic partition in a single share 
fetch request
+        TopicIdPartition remoteFetchTopicIdPartition = 
getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
+
+        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> 
fetchOffsetMetadataMap = new LinkedHashMap<>();
+        remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> 
fetchOffsetMetadataMap.put(
+            topicIdPartition,
+            
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
+        ));
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, 
remoteStorageFetchInfo, fetchOffsetMetadataMap));
+        return Optional.empty();
+    }
+
+    /**
+     * This function returns the first topic partition for which we need to 
perform remote storage fetch. We remove all the
+     * other partitions that can have a remote storage fetch for further 
processing and release the fetch locks for them.
+     * @param remoteStorageFetchInfoMap map containing topic partition to 
remote storage fetch information.
+     * @return the first topic partition for which we need to perform remote 
storage fetch
+     */
+    private TopicIdPartition 
getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, 
RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
+        Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> 
firstRemoteStorageFetchInfo = 
remoteStorageFetchInfoMap.entrySet().iterator().next();
+        TopicIdPartition remoteFetchTopicIdPartition = 
firstRemoteStorageFetchInfo.getKey();
+        remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
+            if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
+                partitionsAcquired.remove(topicIdPartition);
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        return remoteFetchTopicIdPartition;
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : 
remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
+            try {
+                if (fetchOffsetMetadata != 
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+                    
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                }
+            } catch (KafkaStorageException e) { // Case a
+                log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+                canComplete = true;
+            } catch (UnknownTopicOrPartitionException e) { // Case b
+                log.debug("Broker no longer knows of topicPartition {}, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+                canComplete = true;
+            } catch (NotLeaderOrFollowerException e) { // Case c
+                log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+                canComplete = true;
+            }
+            if (canComplete)
+                break;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed.
+     */
+    private void completeRemoteStorageShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData 
= new LinkedHashMap<>();
+        try {
+            List<ShareFetchPartitionData> shareFetchPartitionData = new 
ArrayList<>();
+            int readableBytes = 0;
+            if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+                RemoteFetch remoteFetch = remoteFetchOpt.get();
+                if (remoteFetch.remoteFetchResult().get().error.isPresent()) {
+                    Throwable error = 
remoteFetch.remoteFetchResult().get().error.get();
+                    // If there is any error for the remote fetch topic 
partition, we populate the error accordingly.
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            remoteFetch.topicIdPartition(),
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+                        )
+                    );
+                } else {
+                    FetchDataInfo info = 
remoteFetch.remoteFetchResult().get().fetchDataInfo.get();
+                    TopicIdPartition topicIdPartition = 
remoteFetch.topicIdPartition();
+                    LogReadResult logReadResult = 
localPartitionsAlreadyFetched.get(topicIdPartition);
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            topicIdPartition,
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            new FetchPartitionData(
+                                logReadResult.error(),
+                                logReadResult.highWatermark(),
+                                logReadResult.leaderLogStartOffset(),
+                                info.records,

Review Comment:
   `ShareFetchUtils.maybeSliceFetchRecords()` only supports slicing for 
FileRecords. Should we extend it to support MemoryRecords too?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        // TODO: There is a limitation in remote storage fetch for consumer 
groups that we can only perform remote fetch for
+        //  a single topic partition in a fetch request. Since, the logic of 
fetch is largely based on how consumer groups work,
+        //  we are following the same logic. However, this problem should be 
addressed as part of KAFKA-19133 which should help us perform
+        //  fetch for multiple remote fetch topic partition in a single share 
fetch request
+        TopicIdPartition remoteFetchTopicIdPartition = 
getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
+
+        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> 
fetchOffsetMetadataMap = new LinkedHashMap<>();
+        remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> 
fetchOffsetMetadataMap.put(
+            topicIdPartition,
+            
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
+        ));
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, 
remoteStorageFetchInfo, fetchOffsetMetadataMap));
+        return Optional.empty();
+    }
+
+    /**
+     * This function returns the first topic partition for which we need to 
perform remote storage fetch. We remove all the
+     * other partitions that can have a remote storage fetch for further 
processing and release the fetch locks for them.
+     * @param remoteStorageFetchInfoMap map containing topic partition to 
remote storage fetch information.
+     * @return the first topic partition for which we need to perform remote 
storage fetch
+     */
+    private TopicIdPartition 
getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, 
RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
+        Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> 
firstRemoteStorageFetchInfo = 
remoteStorageFetchInfoMap.entrySet().iterator().next();
+        TopicIdPartition remoteFetchTopicIdPartition = 
firstRemoteStorageFetchInfo.getKey();
+        remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
+            if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
+                partitionsAcquired.remove(topicIdPartition);
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        return remoteFetchTopicIdPartition;
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : 
remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
+            try {
+                if (fetchOffsetMetadata != 
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {

Review Comment:
   Why do we need this check?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));

Review Comment:
   Since we are only processing 1 partition for remote data, should we just 
acquire the lock for 1 partition?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -242,8 +268,14 @@ private void 
completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> top
      */
     @Override
     public boolean tryComplete() {
-        LinkedHashMap<TopicIdPartition, Long> topicPartitionData = 
acquirablePartitions();
+        // Check to see if the remote fetch is in flight. If there is an in 
flight remote fetch we want to resolve it first.
+        // This will help to prevent starving remote storage partitions and 
wasting the significant upfront work involved with
+        // kicking off a fetch from remote storage.

Review Comment:
   Not sure that I understand this comment. Why would we starve remote 
partitions?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+        replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                remoteStorageFetchMetadataMap.put(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get());
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                localPartitionsAlreadyFetched.put(topicIdPartition, 
logReadResult);
+            }
+        });
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        // topic partitions for which fetching would be happening from local 
log and not remote storage.
+        Set<TopicIdPartition> localFetchTopicPartitions = new 
LinkedHashSet<>();
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+                localFetchTopicPartitions.add(topicIdPartition);
+            }
+        });
+        // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+        releasePartitionLocks(localFetchTopicPartitions);
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(remoteStorageFetchInfoMap, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param remoteStorageFetchInfoMap - The topic partition to remote 
storage fetch info map
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo> 
remoteStorageFetchInfoMap,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        // TODO: There is a limitation in remote storage fetch for consumer 
groups that we can only perform remote fetch for
+        //  a single topic partition in a fetch request. Since, the logic of 
fetch is largely based on how consumer groups work,
+        //  we are following the same logic. However, this problem should be 
addressed as part of KAFKA-19133 which should help us perform
+        //  fetch for multiple remote fetch topic partition in a single share 
fetch request
+        TopicIdPartition remoteFetchTopicIdPartition = 
getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
+
+        LinkedHashMap<TopicIdPartition, LogOffsetMetadata> 
fetchOffsetMetadataMap = new LinkedHashMap<>();
+        remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) -> 
fetchOffsetMetadataMap.put(
+            topicIdPartition,
+            
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
+        ));
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult, 
remoteStorageFetchInfo, fetchOffsetMetadataMap));
+        return Optional.empty();
+    }
+
+    /**
+     * This function returns the first topic partition for which we need to 
perform remote storage fetch. We remove all the
+     * other partitions that can have a remote storage fetch for further 
processing and release the fetch locks for them.
+     * @param remoteStorageFetchInfoMap map containing topic partition to 
remote storage fetch information.
+     * @return the first topic partition for which we need to perform remote 
storage fetch
+     */
+    private TopicIdPartition 
getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition, 
RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
+        Map.Entry<TopicIdPartition, RemoteStorageFetchInfo> 
firstRemoteStorageFetchInfo = 
remoteStorageFetchInfoMap.entrySet().iterator().next();
+        TopicIdPartition remoteFetchTopicIdPartition = 
firstRemoteStorageFetchInfo.getKey();
+        remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
+            if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
+                partitionsAcquired.remove(topicIdPartition);
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        return remoteFetchTopicIdPartition;
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry : 
remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
+            try {
+                if (fetchOffsetMetadata != 
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+                    
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                }
+            } catch (KafkaStorageException e) { // Case a
+                log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+                canComplete = true;
+            } catch (UnknownTopicOrPartitionException e) { // Case b
+                log.debug("Broker no longer knows of topicPartition {}, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+                canComplete = true;
+            } catch (NotLeaderOrFollowerException e) { // Case c
+                log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+                canComplete = true;
+            }
+            if (canComplete)
+                break;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed.
+     */
+    private void completeRemoteStorageShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData 
= new LinkedHashMap<>();
+        try {
+            List<ShareFetchPartitionData> shareFetchPartitionData = new 
ArrayList<>();
+            int readableBytes = 0;
+            if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+                RemoteFetch remoteFetch = remoteFetchOpt.get();
+                if (remoteFetch.remoteFetchResult().get().error.isPresent()) {
+                    Throwable error = 
remoteFetch.remoteFetchResult().get().error.get();
+                    // If there is any error for the remote fetch topic 
partition, we populate the error accordingly.
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            remoteFetch.topicIdPartition(),
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+                        )
+                    );
+                } else {
+                    FetchDataInfo info = 
remoteFetch.remoteFetchResult().get().fetchDataInfo.get();
+                    TopicIdPartition topicIdPartition = 
remoteFetch.topicIdPartition();
+                    LogReadResult logReadResult = 
localPartitionsAlreadyFetched.get(topicIdPartition);
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            topicIdPartition,
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            new FetchPartitionData(
+                                logReadResult.error(),
+                                logReadResult.highWatermark(),
+                                logReadResult.leaderLogStartOffset(),
+                                info.records,
+                                Optional.empty(),
+                                logReadResult.lastStableOffset().isDefined() ? 
OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : 
OptionalLong.empty(),
+                                info.abortedTransactions,
+                                
logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) 
logReadResult.preferredReadReplica().get()) : OptionalInt.empty(),
+                                false
+                            )
+                        )
+                    );
+                    readableBytes += info.records.sizeInBytes();
+                }
+            } else {
+                cancelRemoteFetchTask();
+            }
+
+            // If remote fetch bytes  < shareFetch.fetchParams().maxBytes, 
then we will try for a local read.
+            if (readableBytes < shareFetch.fetchParams().maxBytes) {
+                // Get the local log read based topic partitions.
+                LinkedHashMap<TopicIdPartition, SharePartition> 
nonRemoteFetchSharePartitions = new LinkedHashMap<>();
+                sharePartitions.forEach((topicIdPartition, sharePartition) -> {
+                    if (!partitionsAcquired.containsKey(topicIdPartition) && 
!remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) {
+                        nonRemoteFetchSharePartitions.put(topicIdPartition, 
sharePartition);
+                    }
+                });
+                nonRemoteFetchTopicPartitionData = 
acquirablePartitions(nonRemoteFetchSharePartitions);
+                if (!nonRemoteFetchTopicPartitionData.isEmpty()) {
+                    log.trace("Fetchable local share partitions for a remote 
share fetch request data: {} with groupId: {} fetch params: {}",
+                        nonRemoteFetchTopicPartitionData, 
shareFetch.groupId(), shareFetch.fetchParams());
+
+                    LinkedHashMap<TopicIdPartition, LogReadResult> 
responseData = readFromLog(
+                        nonRemoteFetchTopicPartitionData,
+                        
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - 
readableBytes, nonRemoteFetchTopicPartitionData.keySet(), 
nonRemoteFetchTopicPartitionData.size()));
+                    for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
responseData.entrySet()) {
+                        if 
(entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
+                            shareFetchPartitionData.add(
+                                new ShareFetchPartitionData(
+                                    entry.getKey(),
+                                    
nonRemoteFetchTopicPartitionData.get(entry.getKey()),
+                                    
entry.getValue().toFetchPartitionData(false)
+                                )
+                            );
+                        }
+                    }
+                }
+            }
+
+            // Update metric to record acquired to requested partitions.
+            double requestTopicToAcquired = (double) 
(partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) / 
shareFetch.topicIdPartitions().size();

Review Comment:
   requestTopicToAcquired => acquiredRatio ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to