junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2045585739


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));

Review Comment:
   After releasing the locks here, it's possible for another client to make 
progress. Should we trigger a check on the purgatory?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));

Review Comment:
   Could we include `logReadResult`? Then we don't need to pass in 
`replicaManagerReadResponse` to `maybeProcessRemoteFetch()`.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     * @param replicaManagerReadResponse - The replica manager read response 
containing log read results for acquired topic partitions
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
+        LogReadResult logReadResult = 
replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));

Review Comment:
   It's a bit inconsistent to set the non error result to an instance val, but 
return the error result to the caller. It will be clearer if we handle both the 
error and non error result in the same way.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     * @param replicaManagerReadResponse - The replica manager read response 
containing log read results for acquired topic partitions
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
+        LogReadResult logReadResult = 
replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));
+        return Optional.empty();
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        TopicIdPartition topicIdPartition = 
remoteFetchOpt.get().topicIdPartition();
+        try {
+            
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        } catch (KafkaStorageException e) { // Case a
+            log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (UnknownTopicOrPartitionException e) { // Case b
+            log.debug("Broker no longer knows of topicPartition {}, satisfy {} 
immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (NotLeaderOrFollowerException e) { // Case c
+            log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+            canComplete = true;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed.
+     */
+    private void completeRemoteStorageShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData 
= new LinkedHashMap<>();
+        try {
+            List<ShareFetchPartitionData> shareFetchPartitionData = new 
ArrayList<>();
+            int readableBytes = 0;
+            if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+                RemoteFetch remoteFetch = remoteFetchOpt.get();
+                RemoteLogReadResult remoteLogReadResult = 
remoteFetch.remoteFetchResult().get();
+                if (remoteLogReadResult.error.isPresent()) {
+                    Throwable error = remoteLogReadResult.error.get();
+                    // If there is any error for the remote fetch topic 
partition, we populate the error accordingly.
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            remoteFetch.topicIdPartition(),
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+                        )
+                    );
+                } else {
+                    FetchDataInfo info = 
remoteLogReadResult.fetchDataInfo.get();
+                    TopicIdPartition topicIdPartition = 
remoteFetch.topicIdPartition();
+                    LogReadResult logReadResult = remoteFetch.logReadResult();
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            topicIdPartition,
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            new FetchPartitionData(
+                                logReadResult.error(),
+                                logReadResult.highWatermark(),
+                                logReadResult.leaderLogStartOffset(),
+                                info.records,
+                                Optional.empty(),
+                                logReadResult.lastStableOffset().isDefined() ? 
OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : 
OptionalLong.empty(),
+                                info.abortedTransactions,
+                                
logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) 
logReadResult.preferredReadReplica().get()) : OptionalInt.empty(),
+                                false
+                            )
+                        )
+                    );
+                    readableBytes += info.records.sizeInBytes();
+                }
+            } else {
+                cancelRemoteFetchTask();
+            }
+
+            // If remote fetch bytes  < shareFetch.fetchParams().maxBytes, 
then we will try for a local read.
+            if (readableBytes < shareFetch.fetchParams().maxBytes) {
+                // Get the local log read based topic partitions.
+                LinkedHashMap<TopicIdPartition, SharePartition> 
nonRemoteFetchSharePartitions = new LinkedHashMap<>();
+                sharePartitions.forEach((topicIdPartition, sharePartition) -> {
+                    if (!partitionsAcquired.containsKey(topicIdPartition)) {
+                        nonRemoteFetchSharePartitions.put(topicIdPartition, 
sharePartition);
+                    }
+                });
+                nonRemoteFetchTopicPartitionData = 
acquirablePartitions(nonRemoteFetchSharePartitions);

Review Comment:
   nonRemoteFetchTopicPartitionData => acquiredNnonRemoteFetchTopicPartition?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     * @param replicaManagerReadResponse - The replica manager read response 
containing log read results for acquired topic partitions
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
+        LogReadResult logReadResult = 
replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));
+        return Optional.empty();
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        TopicIdPartition topicIdPartition = 
remoteFetchOpt.get().topicIdPartition();
+        try {
+            
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        } catch (KafkaStorageException e) { // Case a
+            log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (UnknownTopicOrPartitionException e) { // Case b
+            log.debug("Broker no longer knows of topicPartition {}, satisfy {} 
immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (NotLeaderOrFollowerException e) { // Case c
+            log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+            canComplete = true;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed.

Review Comment:
   The comment is not very accurate since this method could be called when the 
request expires.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();

Review Comment:
   If we get a remoteStorageFetchException, we shouldn't throw the exception to 
the caller. Instead, we should just force complete the request with the error.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     * @param replicaManagerReadResponse - The replica manager read response 
containing log read results for acquired topic partitions
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
+        LogReadResult logReadResult = 
replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));
+        return Optional.empty();
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        TopicIdPartition topicIdPartition = 
remoteFetchOpt.get().topicIdPartition();
+        try {
+            
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        } catch (KafkaStorageException e) { // Case a
+            log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (UnknownTopicOrPartitionException e) { // Case b
+            log.debug("Broker no longer knows of topicPartition {}, satisfy {} 
immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (NotLeaderOrFollowerException e) { // Case c
+            log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+            canComplete = true;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed.
+     */
+    private void completeRemoteStorageShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData 
= new LinkedHashMap<>();
+        try {
+            List<ShareFetchPartitionData> shareFetchPartitionData = new 
ArrayList<>();
+            int readableBytes = 0;
+            if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+                RemoteFetch remoteFetch = remoteFetchOpt.get();
+                RemoteLogReadResult remoteLogReadResult = 
remoteFetch.remoteFetchResult().get();
+                if (remoteLogReadResult.error.isPresent()) {
+                    Throwable error = remoteLogReadResult.error.get();
+                    // If there is any error for the remote fetch topic 
partition, we populate the error accordingly.
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            remoteFetch.topicIdPartition(),
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+                        )
+                    );
+                } else {
+                    FetchDataInfo info = 
remoteLogReadResult.fetchDataInfo.get();
+                    TopicIdPartition topicIdPartition = 
remoteFetch.topicIdPartition();
+                    LogReadResult logReadResult = remoteFetch.logReadResult();
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            topicIdPartition,
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            new FetchPartitionData(
+                                logReadResult.error(),
+                                logReadResult.highWatermark(),
+                                logReadResult.leaderLogStartOffset(),
+                                info.records,
+                                Optional.empty(),
+                                logReadResult.lastStableOffset().isDefined() ? 
OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : 
OptionalLong.empty(),
+                                info.abortedTransactions,
+                                
logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) 
logReadResult.preferredReadReplica().get()) : OptionalInt.empty(),
+                                false
+                            )
+                        )
+                    );
+                    readableBytes += info.records.sizeInBytes();
+                }
+            } else {
+                cancelRemoteFetchTask();
+            }
+
+            // If remote fetch bytes  < shareFetch.fetchParams().maxBytes, 
then we will try for a local read.
+            if (readableBytes < shareFetch.fetchParams().maxBytes) {
+                // Get the local log read based topic partitions.
+                LinkedHashMap<TopicIdPartition, SharePartition> 
nonRemoteFetchSharePartitions = new LinkedHashMap<>();
+                sharePartitions.forEach((topicIdPartition, sharePartition) -> {
+                    if (!partitionsAcquired.containsKey(topicIdPartition)) {
+                        nonRemoteFetchSharePartitions.put(topicIdPartition, 
sharePartition);
+                    }
+                });
+                nonRemoteFetchTopicPartitionData = 
acquirablePartitions(nonRemoteFetchSharePartitions);
+                if (!nonRemoteFetchTopicPartitionData.isEmpty()) {
+                    log.trace("Fetchable local share partitions for a remote 
share fetch request data: {} with groupId: {} fetch params: {}",
+                        nonRemoteFetchTopicPartitionData, 
shareFetch.groupId(), shareFetch.fetchParams());
+
+                    LinkedHashMap<TopicIdPartition, LogReadResult> 
responseData = readFromLog(
+                        nonRemoteFetchTopicPartitionData,
+                        
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - 
readableBytes, nonRemoteFetchTopicPartitionData.keySet(), 
nonRemoteFetchTopicPartitionData.size()));
+                    for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
responseData.entrySet()) {
+                        if 
(entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
+                            shareFetchPartitionData.add(
+                                new ShareFetchPartitionData(
+                                    entry.getKey(),
+                                    
nonRemoteFetchTopicPartitionData.get(entry.getKey()),
+                                    
entry.getValue().toFetchPartitionData(false)
+                                )
+                            );
+                        }
+                    }
+                }
+            }
+
+            // Update metric to record acquired to requested partitions.
+            double acquiredRatio = (double) (partitionsAcquired.size() + 
nonRemoteFetchTopicPartitionData.size()) / 
shareFetch.topicIdPartitions().size();
+            if (acquiredRatio > 0)
+                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(acquiredRatio * 100));
+
+            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
remoteFetchResponse = ShareFetchUtils.processFetchResponse(
+                shareFetch, shareFetchPartitionData, sharePartitions, 
replicaManager, exceptionHandler);
+            shareFetch.maybeComplete(remoteFetchResponse);
+            log.trace("Remote share fetch request completed successfully, 
response: {}", remoteFetchResponse);
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            log.error("Error processing delayed share fetch request", e);
+            Set<TopicIdPartition> topicIdPartitions = new 
LinkedHashSet<>(partitionsAcquired.keySet());
+            
topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet());
+            handleFetchException(shareFetch, topicIdPartitions, e);
+        } finally {
+            Set<TopicIdPartition> topicIdPartitions = new 
LinkedHashSet<>(partitionsAcquired.keySet());
+            
topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet());
+            releasePartitionLocksAndAddToActionQueue(topicIdPartitions);
+        }
+    }
+
+    /**
+     * Cancel the remote storage read task, if it has not been executed yet 
and avoid interrupting the task if it is
+     * already running as it may force closing opened/cached resources as 
transaction index.
+     * Note - This function should only be called when we know that there is a 
remote fetch in-flight/completed.

Review Comment:
   The comment is not very accurate since this method could be called when the 
request expires.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();

Review Comment:
   We need to rename `remoteStorageFetchMetadataMap` since it's no longer a map.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();
+        }
+        // Check if remote fetch can be completed.
+        return maybeCompletePendingRemoteFetch();
+    }
+
+    /**
+     * Returns an option containing an exception if a task for 
RemoteStorageFetchInfo could not be scheduled successfully else returns empty 
optional.
+     * @param topicPartitionRemoteFetchInfo - The remote storage fetch topic 
partition information.
+     * @param replicaManagerReadResponse - The replica manager read response 
containing log read results for acquired topic partitions
+     */
+    private Optional<Exception> processRemoteFetchOrException(
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        TopicIdPartition remoteFetchTopicIdPartition = 
topicPartitionRemoteFetchInfo.topicIdPartition();
+        RemoteStorageFetchInfo remoteStorageFetchInfo = 
topicPartitionRemoteFetchInfo.remoteStorageFetchInfo();
+        LogReadResult logReadResult = 
replicaManagerReadResponse.get(remoteFetchTopicIdPartition);
+
+        Future<Void> remoteFetchTask;
+        CompletableFuture<RemoteLogReadResult> remoteFetchResult = new 
CompletableFuture<>();
+        try {
+            remoteFetchTask = 
replicaManager.remoteLogManager().get().asyncRead(
+                remoteStorageFetchInfo,
+                result -> {
+                    remoteFetchResult.complete(result);
+                    replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(shareFetch.groupId(), 
remoteFetchTopicIdPartition.topicId(), 
remoteFetchTopicIdPartition.partition()));
+                }
+            );
+        } catch (RejectedExecutionException e) {
+            // Return the error if any in scheduling the remote fetch task.
+            log.warn("Unable to fetch data from remote storage", e);
+            return Optional.of(e);
+        } catch (Exception e) {
+            return Optional.of(e);
+        }
+        remoteFetchOpt = Optional.of(new 
RemoteFetch(remoteFetchTopicIdPartition, logReadResult, remoteFetchTask, 
remoteFetchResult, remoteStorageFetchInfo));
+        return Optional.empty();
+    }
+
+    /**
+     * This function checks if the remote fetch can be completed or not. It 
should always be called once you confirm remoteFetchOpt.isPresent().
+     * The operation can be completed if:
+     * Case a: The partition is in an offline log directory on this broker
+     * Case b: This broker does not know the partition it tries to fetch
+     * Case c: This broker is no longer the leader of the partition it tries 
to fetch
+     * Case d: The remote storage read request completed (succeeded or failed)
+     * @return boolean representing whether the remote fetch is completed or 
not.
+     */
+    private boolean maybeCompletePendingRemoteFetch() {
+        boolean canComplete = false;
+
+        TopicIdPartition topicIdPartition = 
remoteFetchOpt.get().topicIdPartition();
+        try {
+            
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        } catch (KafkaStorageException e) { // Case a
+            log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (UnknownTopicOrPartitionException e) { // Case b
+            log.debug("Broker no longer knows of topicPartition {}, satisfy {} 
immediately", topicIdPartition, shareFetch.fetchParams());
+            canComplete = true;
+        } catch (NotLeaderOrFollowerException e) { // Case c
+            log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
+            canComplete = true;
+        }
+
+        if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) 
{ // Case d
+            boolean completedByMe = forceComplete();
+            // If invocation of forceComplete is not successful, then that 
means the request is already completed
+            // hence release the acquired locks.
+            if (!completedByMe) {
+                releasePartitionLocks(partitionsAcquired.keySet());
+            }
+            return completedByMe;
+        } else
+            return false;
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified erroneous remote storage fetch in tryComplete()
+     * It should only be called when we know that there is remote fetch 
in-flight/completed.
+     */
+    private void completeErroneousRemoteShareFetchRequest() {
+        try {
+            handleFetchException(shareFetch, partitionsAcquired.keySet(), 
remoteStorageFetchException.get());
+        } finally {
+            
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+        }
+
+    }
+
+    private void 
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> 
topicIdPartitions) {
+        // Releasing the lock to move ahead with the next request in queue.
+        releasePartitionLocks(topicIdPartitions);
+        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
+        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
+        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+        // we directly call delayedShareFetchPurgatory.checkAndComplete
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+            replicaManager.completeDelayedShareFetchRequest(
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+    }
+
+    /**
+     * This function completes a share fetch request for which we have 
identified remoteFetch during tryComplete()
+     * Note - This function should only be called when we know that there is 
remote fetch in-flight/completed.
+     */
+    private void completeRemoteStorageShareFetchRequest() {
+        LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData 
= new LinkedHashMap<>();
+        try {
+            List<ShareFetchPartitionData> shareFetchPartitionData = new 
ArrayList<>();
+            int readableBytes = 0;
+            if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+                RemoteFetch remoteFetch = remoteFetchOpt.get();
+                RemoteLogReadResult remoteLogReadResult = 
remoteFetch.remoteFetchResult().get();
+                if (remoteLogReadResult.error.isPresent()) {
+                    Throwable error = remoteLogReadResult.error.get();
+                    // If there is any error for the remote fetch topic 
partition, we populate the error accordingly.
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            remoteFetch.topicIdPartition(),
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+                        )
+                    );
+                } else {
+                    FetchDataInfo info = 
remoteLogReadResult.fetchDataInfo.get();
+                    TopicIdPartition topicIdPartition = 
remoteFetch.topicIdPartition();
+                    LogReadResult logReadResult = remoteFetch.logReadResult();
+                    shareFetchPartitionData.add(
+                        new ShareFetchPartitionData(
+                            topicIdPartition,
+                            
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+                            new FetchPartitionData(
+                                logReadResult.error(),
+                                logReadResult.highWatermark(),
+                                logReadResult.leaderLogStartOffset(),
+                                info.records,
+                                Optional.empty(),
+                                logReadResult.lastStableOffset().isDefined() ? 
OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : 
OptionalLong.empty(),
+                                info.abortedTransactions,
+                                
logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) 
logReadResult.preferredReadReplica().get()) : OptionalInt.empty(),
+                                false
+                            )
+                        )
+                    );
+                    readableBytes += info.records.sizeInBytes();
+                }
+            } else {
+                cancelRemoteFetchTask();
+            }
+
+            // If remote fetch bytes  < shareFetch.fetchParams().maxBytes, 
then we will try for a local read.
+            if (readableBytes < shareFetch.fetchParams().maxBytes) {
+                // Get the local log read based topic partitions.
+                LinkedHashMap<TopicIdPartition, SharePartition> 
nonRemoteFetchSharePartitions = new LinkedHashMap<>();
+                sharePartitions.forEach((topicIdPartition, sharePartition) -> {
+                    if (!partitionsAcquired.containsKey(topicIdPartition)) {
+                        nonRemoteFetchSharePartitions.put(topicIdPartition, 
sharePartition);
+                    }
+                });
+                nonRemoteFetchTopicPartitionData = 
acquirablePartitions(nonRemoteFetchSharePartitions);
+                if (!nonRemoteFetchTopicPartitionData.isEmpty()) {
+                    log.trace("Fetchable local share partitions for a remote 
share fetch request data: {} with groupId: {} fetch params: {}",
+                        nonRemoteFetchTopicPartitionData, 
shareFetch.groupId(), shareFetch.fetchParams());
+
+                    LinkedHashMap<TopicIdPartition, LogReadResult> 
responseData = readFromLog(
+                        nonRemoteFetchTopicPartitionData,
+                        
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - 
readableBytes, nonRemoteFetchTopicPartitionData.keySet(), 
nonRemoteFetchTopicPartitionData.size()));
+                    for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
responseData.entrySet()) {
+                        if 
(entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
+                            shareFetchPartitionData.add(
+                                new ShareFetchPartitionData(
+                                    entry.getKey(),
+                                    
nonRemoteFetchTopicPartitionData.get(entry.getKey()),
+                                    
entry.getValue().toFetchPartitionData(false)
+                                )
+                            );
+                        }
+                    }
+                }
+            }
+
+            // Update metric to record acquired to requested partitions.
+            double acquiredRatio = (double) (partitionsAcquired.size() + 
nonRemoteFetchTopicPartitionData.size()) / 
shareFetch.topicIdPartitions().size();
+            if (acquiredRatio > 0)
+                
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(acquiredRatio * 100));
+
+            Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
remoteFetchResponse = ShareFetchUtils.processFetchResponse(
+                shareFetch, shareFetchPartitionData, sharePartitions, 
replicaManager, exceptionHandler);
+            shareFetch.maybeComplete(remoteFetchResponse);
+            log.trace("Remote share fetch request completed successfully, 
response: {}", remoteFetchResponse);
+        } catch (RuntimeException e) {

Review Comment:
   Why do we need this? All Kafka exceptions are RuntimeException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to