adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2046211183


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +582,300 @@ Lock lock() {
         return lock;
     }
 
+    // Visible for testing.
+    RemoteFetch remoteFetch() {
+        return remoteFetchOpt.orElse(null);
+    }
+
     // Visible for testing.
     Meter expiredRequestMeter() {
         return expiredRequestMeter;
     }
+
+    private Optional<TopicPartitionRemoteFetchInfo> 
maybePrepareRemoteStorageFetchInfo(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) {
+        Optional<TopicPartitionRemoteFetchInfo> remoteStorageFetchMetadataMap 
= Optional.empty();
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponse.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            LogReadResult logReadResult = entry.getValue();
+            if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+                // TODO: There is a limitation in remote storage fetch for 
consumer groups that we can only perform remote fetch for
+                //  a single topic partition in a fetch request. Since, the 
logic of fetch is largely based on how consumer groups work,
+                //  we are following the same logic. However, this problem 
should be addressed as part of KAFKA-19133 which should help us perform
+                //  fetch for multiple remote fetch topic partition in a 
single share fetch request
+                remoteStorageFetchMetadataMap = Optional.of(new 
TopicPartitionRemoteFetchInfo(topicIdPartition, 
logReadResult.info().delayedRemoteStorageFetch.get()));
+                partitionsAcquired.put(topicIdPartition, 
topicPartitionData.get(topicIdPartition));
+                break;
+            }
+        }
+        return remoteStorageFetchMetadataMap;
+    }
+
+    private boolean maybeProcessRemoteFetch(
+        LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+        TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo,
+        LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
+    ) throws Exception {
+        topicPartitionData.keySet().forEach(topicIdPartition -> {
+            // topic partitions for which fetch would not be happening in this 
share fetch request.
+            if 
(!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) {
+                // Release acquisition lock for the topic partitions that were 
acquired but were not a part of remote fetch.
+                releasePartitionLocks(Set.of(topicIdPartition));
+            }
+        });
+        Optional<Exception> exceptionOpt = 
processRemoteFetchOrException(topicPartitionRemoteFetchInfo, 
replicaManagerReadResponse);
+        if (exceptionOpt.isPresent()) {
+            remoteStorageFetchException = exceptionOpt;
+            throw exceptionOpt.get();

Review Comment:
   We are not throwing exception to the caller. We are force completing the 
request with the error. When we throw an error in `maybeProcessRemoteFetch`, we 
handle it in the 
[catch](https://github.com/apache/kafka/pull/19437/files#diff-d835cdc01e77905316584ce9e6e21a060cb3d36efa717d4b822b16744e4d713aR324)
 in `tryComplete`. This catch calls `forceComplete`. Once we enter 
`onComplete`, it goes to this 
[line](https://github.com/apache/kafka/pull/19437/files#diff-d835cdc01e77905316584ce9e6e21a060cb3d36efa717d4b822b16744e4d713aR197)
 and calls `completeErroneousRemoteShareFetchRequest`. The function 
`completeErroneousRemoteShareFetchRequest` completes the share fetch request 
with exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to